Skip to content

Commit

Permalink
fix corrupt data when uploading to object storage using fsspec (#296)
Browse files Browse the repository at this point in the history
* fix corrupt data when uploading to object storage using fsspec

* add test coverage
  • Loading branch information
danangmassandy authored Dec 11, 2024
1 parent 404fe83 commit 3d1323f
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 121 deletions.
7 changes: 6 additions & 1 deletion django_project/core/settings/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@
use_threads=True,
max_concurrency=10
)
AWS_PRODUCTS_TRANSFER_CONFIG = TransferConfig(
multipart_chunksize=300 * MB,
use_threads=True,
max_concurrency=4
)
MINIO_AWS_ACCESS_KEY_ID = os.environ.get("MINIO_AWS_ACCESS_KEY_ID")
MINIO_AWS_SECRET_ACCESS_KEY = os.environ.get("MINIO_AWS_SECRET_ACCESS_KEY")
MINIO_AWS_BUCKET_NAME = os.environ.get("MINIO_AWS_BUCKET_NAME")
Expand Down Expand Up @@ -80,7 +85,7 @@
"bucket_name": os.environ.get("MINIO_GAP_AWS_BUCKET_NAME"),
"file_overwrite": False,
"max_memory_size": 500 * MB, # 500MB
"transfer_config": AWS_TRANSFER_CONFIG,
"transfer_config": AWS_PRODUCTS_TRANSFER_CONFIG,
"endpoint_url": MINIO_AWS_ENDPOINT_URL
},
}
Expand Down
34 changes: 21 additions & 13 deletions django_project/gap/providers/observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
from django.contrib.gis.geos import Polygon, Point
from django.contrib.gis.db.models.functions import Distance, GeoFunc
from typing import List, Tuple, Union
from fsspec.core import OpenFile
from django.core.files.storage import storages
from storages.backends.s3boto3 import S3Boto3Storage

from gap.models import (
Dataset,
Expand Down Expand Up @@ -210,19 +211,20 @@ def to_csv_stream(self, suffix='.csv', separator=','):
sep=separator
)

def to_csv(self, suffix='.csv', separator=',', **kwargs):
def to_csv(self, suffix='.csv', separator=','):
"""Generate csv file to object storage."""
headers, _ = self._get_headers(use_station_id=True)

# get dataframe
df_pivot = self._get_data_frame(use_station_id=True)

outfile: OpenFile = None
outfile, output = self._get_fsspec_remote_url(
suffix, mode='w', **kwargs
)
output = self._get_file_remote_url(suffix)
s3_storage: S3Boto3Storage = storages["gap_products"]

with outfile as tmp_file:
with (
tempfile.NamedTemporaryFile(
suffix=suffix, delete=True, delete_on_close=False)
) as tmp_file:
# write headers
write_headers = True

Expand All @@ -241,6 +243,8 @@ def to_csv(self, suffix='.csv', separator=',', **kwargs):
if write_headers:
write_headers = False

s3_storage.save(output, tmp_file)

return output

def _get_xarray_dataset(self):
Expand Down Expand Up @@ -293,20 +297,24 @@ def to_netcdf_stream(self):
break
yield chunk

def to_netcdf(self, **kwargs):
def to_netcdf(self):
"""Generate netcdf file to object storage."""
ds = self._get_xarray_dataset()

outfile, output = self._get_fsspec_remote_url('.nc', **kwargs)

with outfile as tmp_file:
output_url = self._get_file_remote_url('.nc')
s3_storage: S3Boto3Storage = storages["gap_products"]
with (
tempfile.NamedTemporaryFile(
suffix=".nc", delete=True, delete_on_close=False)
) as tmp_file:
x = ds.to_netcdf(
tmp_file.name, format='NETCDF4', engine='h5netcdf',
compute=False
)
execute_dask_compute(x)

return output
s3_storage.save(output_url, tmp_file)

return output_url

def _to_dict(self) -> dict:
"""Convert into dict.
Expand Down
214 changes: 130 additions & 84 deletions django_project/gap/utils/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from typing import Union, List, Tuple
import fsspec
from django.core.files.storage import storages
from storages.backends.s3boto3 import S3Boto3Storage

import numpy as np
import pytz
Expand Down Expand Up @@ -450,34 +451,16 @@ def _get_s3_variables(self) -> dict:
'MINIO_GAP_AWS_DIR_PREFIX', '')
return results

def _get_fsspec_remote_url(self, suffix, mode='wb', **kwargs):
def _get_file_remote_url(self, suffix):
# s3 variables to product bucket
s3 = self._get_s3_variables()

output_url = (
f's3://{s3["AWS_BUCKET_NAME"]}/{s3["AWS_DIR_PREFIX"]}'
)
output_url = s3["AWS_DIR_PREFIX"]
if not output_url.endswith('/'):
output_url += '/'
output_url += f'user_data/{uuid.uuid4().hex}{suffix}'

outfile = fsspec.open(
f'simplecache::{output_url}',
mode=mode,
s3={
'key': s3.get('AWS_ACCESS_KEY_ID'),
'secret': s3.get('AWS_SECRET_ACCESS_KEY'),
'endpoint_url': s3.get('AWS_ENDPOINT_URL'),
# 'region_name': s3.get('AWS_REGION_NAME'),
'max_concurrency': kwargs.get('max_concurrency', 2),
'default_block_size': kwargs.get(
'default_block_size',
200 * 1024 * 1024
)
}
)

return outfile, output_url
return output_url

def to_netcdf_stream(self):
"""Generate netcdf stream."""
Expand All @@ -489,26 +472,32 @@ def to_netcdf_stream(self):
tmp_file.name, format='NETCDF4', engine='h5netcdf',
compute=False
)
execute_dask_compute(x)
execute_dask_compute(x, is_api=True)
with open(tmp_file.name, 'rb') as f:
while True:
chunk = f.read(self.chunk_size_in_bytes)
if not chunk:
break
yield chunk

def to_netcdf(self, **kwargs):
def to_netcdf(self):
"""Generate netcdf file to object storage."""
outfile, output = self._get_fsspec_remote_url('.nc', **kwargs)
output_url = self._get_file_remote_url('.nc')
s3_storage: S3Boto3Storage = storages["gap_products"]

with outfile as tmp_file:
with (
tempfile.NamedTemporaryFile(
suffix=".nc", delete=True, delete_on_close=False)
) as tmp_file:
x = self.xr_dataset.to_netcdf(
tmp_file.name, format='NETCDF4', engine='h5netcdf',
compute=False
)
execute_dask_compute(x)
execute_dask_compute(x, is_api=True)

return output
s3_storage.save(output_url, tmp_file)

return output_url

def _get_chunk_indices(self, chunks):
indices = []
Expand All @@ -526,16 +515,17 @@ def _get_dataset_for_csv(self):
]
# use date chunk = 1 to order by date
rechunk = {
self.date_variable: 1,
'lat': 300,
'lon': 300
self.date_variable: 1
}
if 'lat' in self.xr_dataset.dims:
dim_order.append('lat')
dim_order.append('lon')
rechunk['lat'] = 300
rechunk['lon'] = 300
else:
reordered_cols.insert(0, 'lon')
reordered_cols.insert(0, 'lat')
rechunk[self.date_variable] = 300
if 'ensemble' in self.xr_dataset.dims:
dim_order.append('ensemble')
rechunk['ensemble'] = 50
Expand All @@ -560,50 +550,74 @@ def to_csv_stream(self, suffix='.csv', separator=','):
date_indices = self._get_chunk_indices(
ds.chunksizes[self.date_variable]
)
lat_indices = self._get_chunk_indices(ds.chunksizes['lat'])
lon_indices = self._get_chunk_indices(ds.chunksizes['lon'])
write_headers = True

# cannot use dask utils because to_dataframe is not returning
# delayed object
with dask.config.set(
pool=ThreadPoolExecutor(get_num_of_threads(is_api=True))
):
# iterate foreach chunk
for date_start, date_stop in date_indices:
for lat_start, lat_stop in lat_indices:
for lon_start, lon_stop in lon_indices:
slice_dict = {
self.date_variable: slice(date_start, date_stop),
'lat': slice(lat_start, lat_stop),
'lon': slice(lon_start, lon_stop)
}
chunk = ds.isel(**slice_dict)
chunk_df = chunk.to_dataframe(dim_order=dim_order)
chunk_df = chunk_df[reordered_cols]
if 'lat' in dim_order:
lat_indices = self._get_chunk_indices(ds.chunksizes['lat'])
lon_indices = self._get_chunk_indices(ds.chunksizes['lon'])
write_headers = True
# iterate foreach chunk
for date_start, date_stop in date_indices:
for lat_start, lat_stop in lat_indices:
for lon_start, lon_stop in lon_indices:
slice_dict = {
self.date_variable: slice(
date_start, date_stop
),
'lat': slice(lat_start, lat_stop),
'lon': slice(lon_start, lon_stop)
}
chunk = ds.isel(**slice_dict)
chunk_df = chunk.to_dataframe(dim_order=dim_order)
chunk_df = chunk_df[reordered_cols]

if write_headers:
headers = dim_order + list(chunk_df.columns)
yield bytes(
separator.join(headers) + '\n',
'utf-8'
)
write_headers = False
if write_headers:
headers = dim_order + list(chunk_df.columns)
yield bytes(
separator.join(headers) + '\n',
'utf-8'
)
write_headers = False

yield chunk_df.to_csv(
index=True, header=False, float_format='%g',
sep=separator
yield chunk_df.to_csv(
index=True, header=False, float_format='%g',
sep=separator
)
else:
write_headers = True
# iterate foreach chunk
for date_start, date_stop in date_indices:
slice_dict = {
self.date_variable: slice(date_start, date_stop)
}
chunk = ds.isel(**slice_dict)
chunk_df = chunk.to_dataframe(dim_order=dim_order)
chunk_df = chunk_df[reordered_cols]

if write_headers:
headers = dim_order + list(chunk_df.columns)
yield bytes(
separator.join(headers) + '\n',
'utf-8'
)
write_headers = False

def to_csv(self, suffix='.csv', separator=',', **kwargs):
yield chunk_df.to_csv(
index=True, header=False, float_format='%g',
sep=separator
)

def to_csv(self, suffix='.csv', separator=','):
"""Generate csv file to object storage."""
ds, dim_order, reordered_cols = self._get_dataset_for_csv()

date_indices = self._get_chunk_indices(
ds.chunksizes[self.date_variable]
)
lat_indices = self._get_chunk_indices(ds.chunksizes['lat'])
lon_indices = self._get_chunk_indices(ds.chunksizes['lon'])
write_headers = True
output = None

Expand All @@ -612,33 +626,65 @@ def to_csv(self, suffix='.csv', separator=',', **kwargs):
with dask.config.set(
pool=ThreadPoolExecutor(get_num_of_threads(is_api=True))
):
outfile, output = self._get_fsspec_remote_url(
suffix, mode='w', **kwargs
)
output = self._get_file_remote_url(suffix)
s3_storage: S3Boto3Storage = storages["gap_products"]
with (
tempfile.NamedTemporaryFile(
suffix=suffix, delete=True, delete_on_close=False)
) as tmp_file:
if 'lat' in dim_order:
lat_indices = self._get_chunk_indices(
ds.chunksizes['lat']
)
lon_indices = self._get_chunk_indices(
ds.chunksizes['lon']
)
# iterate foreach chunk
for date_start, date_stop in date_indices:
for lat_start, lat_stop in lat_indices:
for lon_start, lon_stop in lon_indices:
slice_dict = {
self.date_variable: slice(
date_start, date_stop
),
'lat': slice(lat_start, lat_stop),
'lon': slice(lon_start, lon_stop)
}
chunk = ds.isel(**slice_dict)
chunk_df = chunk.to_dataframe(
dim_order=dim_order
)
chunk_df = chunk_df[reordered_cols]

chunk_df.to_csv(
tmp_file.name, index=True, mode='a',
header=write_headers,
float_format='%g', sep=separator
)
if write_headers:
write_headers = False
else:
# iterate foreach chunk
for date_start, date_stop in date_indices:
slice_dict = {
self.date_variable: slice(
date_start, date_stop
)
}
chunk = ds.isel(**slice_dict)
chunk_df = chunk.to_dataframe(dim_order=dim_order)
chunk_df = chunk_df[reordered_cols]

with outfile as tmp_file:
# iterate foreach chunk
for date_start, date_stop in date_indices:
for lat_start, lat_stop in lat_indices:
for lon_start, lon_stop in lon_indices:
slice_dict = {
self.date_variable: slice(
date_start, date_stop
),
'lat': slice(lat_start, lat_stop),
'lon': slice(lon_start, lon_stop)
}
chunk = ds.isel(**slice_dict)
chunk_df = chunk.to_dataframe(dim_order=dim_order)
chunk_df = chunk_df[reordered_cols]
chunk_df.to_csv(
tmp_file.name, index=True, mode='a',
header=write_headers,
float_format='%g', sep=separator
)
if write_headers:
write_headers = False

chunk_df.to_csv(
tmp_file.name, index=True, mode='a',
header=write_headers,
float_format='%g', sep=separator
)
if write_headers:
write_headers = False
# save to s3
s3_storage.save(output, tmp_file)

return output

Expand Down
Loading

0 comments on commit 3d1323f

Please sign in to comment.