-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for Eumetsat in weather-dl #160
base: main
Are you sure you want to change the base?
Conversation
raise TimeoutError(f'Customisation took longer than {timeout}s') | ||
self.logger.info('Customisation for product %s: output %s', product, customisation.outputs) | ||
with customisation.stream_output(customisation.outputs[0]) as stream: | ||
with open(output, 'wb') as fdst: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you tested that this works? I initially managed to download some files directly in netcdf using this, but recently it has failed with various errors. I think unless this works reliably, we should just support downloading native.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I have tested it and this works fine. It seems reliable considering we have handled the library’s limitations. However, I too faced a few issues randomly - might be because the eumdac library is not mature.
Hence, we are thinking in the direction of downloading files in native format (.nat) only as it is more reliable and then process it accordingly for BiqQuery ingestion. WDYT ?
Also do you have an idea on how to open/read native files (.nat) ?
Re: recently it has failed with various errors - The errors reported by eumdac library are incorrect. The message says problems are related to Authentication & Authorisation but actually the problem is occuring due to constraint violation of EITHER max 3 customisation (running+queued) limit OR user workspace exceeding 20GB.
EUMETSAT Data Tailor API throws "You are exceeding your maximum number 3 of queued+running customisations." | ||
error. | ||
|
||
User's personal workspace is restricted to 20 GB. To resolve workspace size exhaust error, please delete |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I remember this only applies to the web interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, as we are using the eumdac library that internally uses Data Tailor Web Service (DTWS).
See: https://gitlab.eumetsat.int/eumetlab/data-services/eumdac/-/blob/public/eumdac/datatailor.py#L102
Hence we might have to bear with the limitations of (1) max 3 customisation (running+queued) and (2) user workspace of 20GB.
We will also explore Data Tailor standalone and other libraries to check the possibility of integrating the same within the current pipeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey Rahul. I've gone through another revision and left a few minor notes. However, let's schedule some time for us to meet to discuss a slightly new approach for this change. I think we may be able to maintain our partition strategy moreso that I originally though, after looking at the underlying API. Let's meet to brainstorm the topic.
if client_name == 'eumetsat': | ||
num_requesters_per_key = client.num_requests_per_key( | ||
config.dataset, | ||
bool(config.selection.get('eumetsat_format_conversion_to')) | ||
) | ||
else: | ||
num_requesters_per_key = client.num_requests_per_key( | ||
config.dataset | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be necessary. We already delegate calculating the number of requests per key by the download class. Instead of adding an if statement, let's see if we can add this logic without creating a special case for this client. Refactoring the general case is OK.
def __init__(self, config: Config, level: int = logging.INFO, | ||
config_subsection: t.Optional[t.Tuple] = None) -> None: | ||
super().__init__(config, level) | ||
self.key = config.kwargs.get('api_key', os.environ.get("EUMETSATAPI_KEY")) | ||
self.secret = config.kwargs.get('api_secret', os.environ.get("EUMETSATAPI_SECRET")) | ||
if not self.key and config_subsection: | ||
self.key = config_subsection[1].get('api_key') | ||
if not self.secret and config_subsection: | ||
self.secret = config_subsection[1].get('api_secret') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to pass in a config subsection here? Shouldn't this already be done in the partition step? The standard key is the one from the subsection.
|
||
def download_custom(self, product: eumdac.product.Product, token: eumdac.AccessToken, | ||
chain_config: eumdac.tailor_models.Chain, output: str) -> None: | ||
"""Downloads the prduct after customisation.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: prduct
shutil.copyfileobj(stream, fdst, DEFAULT_READ_BUFFER_SIZE) | ||
|
||
def retrieve(self, dataset: str, selection: t.Dict, output: str) -> None: | ||
selection_ = optimize_selection_partition(selection) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we only allow partition by ID, then I think this is unnecessary?
with open(output, 'wb') as fdst: | ||
shutil.copyfileobj(fsrc, fdst, DEFAULT_READ_BUFFER_SIZE) | ||
|
||
def download_custom(self, product: eumdac.product.Product, token: eumdac.AccessToken, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use the retry decorator util here, too.
credentials = (self.key, self.secret) | ||
return eumdac.AccessToken(credentials) | ||
|
||
def download_native(self, product: eumdac.product.Product, output: str) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use the retry decorator util here.
No description provided.