generated from ecohealthalliance/container-template
-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Resolve incompatible types when merging parquet files. Involved simpl…
…ifying soil_preprocessed as well. Figure out multiple join without collecting to remain in arrow until hive write. Static layers working still need to fix dynamic layers.
- Loading branch information
Showing
11 changed files
with
1,977 additions
and
1,064 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,68 +1,31 @@ | ||
#' Augment Weather Data | ||
#' Augment and store disparate datasets | ||
#' | ||
#' This function collects data from three different sources, checks for missing values, | ||
#' combines them into a single dataset, and saves the augmented data as a partitioned dataset | ||
#' in parquet format to a specified directory. | ||
#' This function ingests multiple datasets, augments them, and stores them in Parquet format in a specified directory. | ||
#' | ||
#' @author Emma Mendelsohn | ||
#' @author Nathan C. Layman | ||
#' | ||
#' @param weather_anomalies File path to the weather anomalies dataset. | ||
#' @param forecasts_anomalies File path to the forecasts anomalies dataset. | ||
#' @param ndvi_anomalies File path to the NDVI anomalies dataset. | ||
#' @param augmented_data_directory Directory where the augmented data will be saved in parquet format. | ||
#' @param augmented_data_sources The list of data sources to be augmented. These sources are ingested as Arrow datasets. | ||
#' @param augmented_data_directory The directory where augmented datasets are to be stored. | ||
#' @param ... Additional arguments not used by this function but included for potential function extensions. | ||
#' | ||
#' @return A string containing the file path to the directory where the augmented data is saved. | ||
#' @return A vector of strings containing the filepaths to the newly created Parquet files. | ||
#' | ||
#' @note This function performs a left join of the three datasets on the date, x, and y variables. | ||
#' Any NA values in the 'date', 'x', and 'y' columns of the dataset will be dropped. The function | ||
#' saves the resulting dataset in the specified directory using hive partitioning by date. | ||
#' @note This function uses Apache Arrow for data ingestion and to write Parquet files. The output files are partitioned by date and compressed using gzip. | ||
#' | ||
#' @examples | ||
#' augment_data(weather_anomalies = 'path/to/weather_data', | ||
#' forecasts_anomalies = 'path/to/forecast_data', | ||
#' ndvi_anomalies = 'path/to/ndvi_data', | ||
#' augmented_data_directory = 'path/to/save/augmented_data') | ||
#' augment_data(augmented_data_sources = list("dataset1.csv", "dataset2.feather"), | ||
#' augmented_data_directory = "./data") | ||
#' | ||
#' @export | ||
augment_data <- function(weather_anomalies, | ||
forecasts_anomalies, | ||
ndvi_anomalies, | ||
augment_data <- function(augmented_data_sources, | ||
augmented_data_directory, | ||
overwrite = FALSE, | ||
...) { | ||
|
||
# Figure out how to do all this OUT of memory. | ||
message("Loading datasets into memory") | ||
weather <- arrow::open_dataset(weather_anomalies) |> dplyr::collect() | ||
forecasts <- arrow::open_dataset(forecasts_anomalies) |> dplyr::collect() | ||
ndvi <- arrow::open_dataset(ndvi_anomalies) |> dplyr::collect() | ||
|
||
message("NA checks") | ||
## Weather and forecasts | ||
### NAs are in scaled precip data, due to days with 0 precip | ||
weather_check <- purrr::map_lgl(weather, ~any(is.na(.))) | ||
assertthat::assert_that(all(str_detect(names(weather_check[weather_check]), "scaled"))) | ||
|
||
forecasts_check <- purrr::map_lgl(forecasts, ~any(is.na(.))) | ||
assertthat::assert_that(all(str_detect(names(forecasts_check[forecasts_check]), "scaled"))) | ||
|
||
## NDVI | ||
### Prior to 2018: NAs are due to region missing from Eastern Africa in modis data | ||
### After 2018: NAs are due to smaller pockets of missing data on a per-cycle basis | ||
### okay to remove when developing RSA model (issue #72) | ||
ndvi_check <- purrr::map_lgl(ndvi, ~any(is.na(.))) | ||
assertthat::assert_that(!any(ndvi_check[c("date", "x", "y")])) | ||
ndvi <- drop_na(ndvi) | ||
|
||
message("Join into a single object") | ||
augmented_data <- left_join(weather, forecasts, by = join_by(date, x, y)) |> | ||
left_join(ndvi, by = join_by(date, x, y)) | ||
# DON'T collect if at all possible. Keep everything in arrow to keep it out of memory until the last possible moment before hive writing | ||
ds <- reduce(map(unlist(augmented_data_sources$static_layers), arrow::open_dataset), dplyr::left_join, by = c("x", "y")) | ||
|
||
message("Save as parquets using hive partitioning by date") | ||
augmented_data |> | ||
group_by(date) |> | ||
write_dataset(augmented_data_directory) | ||
|
||
return(augmented_data_directory) | ||
ds |> mutate(hive_date = date) |> group_by(hive_date) |> arrow::write_dataset(augmented_data_directory, compression = "gzip", compression_level = 5) | ||
|
||
return(list.files(augmented_data_directory, pattern = ".parquet", recursive = TRUE, full.names = TRUE)) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
#' Augment Weather Data | ||
#' | ||
#' This function collects data from three different sources, checks for missing values, | ||
#' combines them into a single dataset, and saves the augmented data as a partitioned dataset | ||
#' in parquet format to a specified directory. | ||
#' | ||
#' @author Emma Mendelsohn | ||
#' | ||
#' @param weather_anomalies File path to the weather anomalies dataset. | ||
#' @param forecasts_anomalies File path to the forecasts anomalies dataset. | ||
#' @param ndvi_anomalies File path to the NDVI anomalies dataset. | ||
#' @param augmented_data_directory Directory where the augmented data will be saved in parquet format. | ||
#' | ||
#' @return A string containing the file path to the directory where the augmented data is saved. | ||
#' | ||
#' @note This function performs a left join of the three datasets on the date, x, and y variables. | ||
#' Any NA values in the 'date', 'x', and 'y' columns of the dataset will be dropped. The function | ||
#' saves the resulting dataset in the specified directory using hive partitioning by date. | ||
#' | ||
#' @examples | ||
#' augment_data(weather_anomalies = 'path/to/weather_data', | ||
#' forecasts_anomalies = 'path/to/forecast_data', | ||
#' ndvi_anomalies = 'path/to/ndvi_data', | ||
#' augmented_data_directory = 'path/to/save/augmented_data') | ||
#' | ||
#' @export | ||
augment_data_old <- function(weather_anomalies, | ||
forecasts_anomalies, | ||
ndvi_anomalies, | ||
augmented_data_directory, | ||
overwrite = FALSE, | ||
...) { | ||
|
||
# Figure out how to do all this OUT of memory. | ||
message("Loading datasets into memory") | ||
weather <- arrow::open_dataset(weather_anomalies) |> dplyr::collect() | ||
forecasts <- arrow::open_dataset(forecasts_anomalies) |> dplyr::collect() | ||
ndvi <- arrow::open_dataset(ndvi_anomalies) |> dplyr::collect() | ||
|
||
message("NA checks") | ||
## Weather and forecasts | ||
### NAs are in scaled precip data, due to days with 0 precip | ||
weather_check <- purrr::map_lgl(weather, ~any(is.na(.))) | ||
assertthat::assert_that(all(str_detect(names(weather_check[weather_check]), "scaled"))) | ||
|
||
forecasts_check <- purrr::map_lgl(forecasts, ~any(is.na(.))) | ||
assertthat::assert_that(all(str_detect(names(forecasts_check[forecasts_check]), "scaled"))) | ||
|
||
## NDVI | ||
### Prior to 2018: NAs are due to region missing from Eastern Africa in modis data | ||
### After 2018: NAs are due to smaller pockets of missing data on a per-cycle basis | ||
### okay to remove when developing RSA model (issue #72) | ||
ndvi_check <- purrr::map_lgl(ndvi, ~any(is.na(.))) | ||
assertthat::assert_that(!any(ndvi_check[c("date", "x", "y")])) | ||
ndvi <- drop_na(ndvi) | ||
|
||
message("Join into a single object") | ||
augmented_data <- left_join(weather, forecasts, by = join_by(date, x, y)) |> | ||
left_join(ndvi, by = join_by(date, x, y)) | ||
|
||
message("Save as parquets using hive partitioning by date") | ||
augmented_data |> | ||
group_by(date) |> | ||
write_dataset(augmented_data_directory) | ||
|
||
return(augmented_data_directory) | ||
|
||
} |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.