Skip to content

Commit

Permalink
First full tar_make() of all data.
Browse files Browse the repository at this point in the history
  • Loading branch information
n8layman committed Oct 16, 2024
1 parent 7082dce commit 5403084
Show file tree
Hide file tree
Showing 13 changed files with 3,677 additions and 2,557 deletions.
Binary file modified .env
Binary file not shown.
2 changes: 1 addition & 1 deletion R/AWS_get_folder.R
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ AWS_put_files <- function(transformed_file_list,
local_folder,
...) {

transformed_file_list <- basename(transformed_file_list)
transformed_file_list <- basename(transformed_file_list |> unlist())

# Check if AWS credentials and region are set in the environment
if (any(Sys.getenv(c("AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_REGION")) == "")) {
Expand Down
4 changes: 2 additions & 2 deletions R/aggregate_augmented_data_by_adm.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
#' @author Emma Mendelsohn
#' @export
aggregate_augmented_data_by_adm <- function(augmented_data,
rsa_polygon,
model_dates_selected) {
rsa_polygon,
model_dates_selected) {

r <- arrow::read_parquet(glue::glue("{augmented_data}/date={model_dates_selected}/part-0.parquet")) |>
rast()
Expand Down
44 changes: 33 additions & 11 deletions R/augment_data.R
Original file line number Diff line number Diff line change
@@ -1,16 +1,38 @@
#' @title
#' @param weather_anomalies
#' @param forecasts_anomalies
#' @param ndvi_anomalies
#' @param augmented_data_directory
#' @return
#' Augment Weather Data
#'
#' This function collects data from three different sources, checks for missing values,
#' combines them into a single dataset, and saves the augmented data as a partitioned dataset
#' in parquet format to a specified directory.
#'
#' @author Emma Mendelsohn
#'
#' @param weather_anomalies File path to the weather anomalies dataset.
#' @param forecasts_anomalies File path to the forecasts anomalies dataset.
#' @param ndvi_anomalies File path to the NDVI anomalies dataset.
#' @param augmented_data_directory Directory where the augmented data will be saved in parquet format.
#'
#' @return A string containing the file path to the directory where the augmented data is saved.
#'
#' @note This function performs a left join of the three datasets on the date, x, and y variables.
#' Any NA values in the 'date', 'x', and 'y' columns of the dataset will be dropped. The function
#' saves the resulting dataset in the specified directory using hive partitioning by date.
#'
#' @examples
#' augment_data(weather_anomalies = 'path/to/weather_data',
#' forecasts_anomalies = 'path/to/forecast_data',
#' ndvi_anomalies = 'path/to/ndvi_data',
#' augmented_data_directory = 'path/to/save/augmented_data')
#'
#' @export
augment_data <- function(weather_anomalies, forecasts_anomalies,
ndvi_anomalies, augmented_data_directory) {

augment_data <- function(weather_anomalies,
forecasts_anomalies,
ndvi_anomalies,
augmented_data_directory,
overwrite = FALSE,
...) {

message("Load datasets into memory")
# Figure out how to do all this OUT of memory.
message("Loading datasets into memory")
weather <- arrow::open_dataset(weather_anomalies) |> dplyr::collect()
forecasts <- arrow::open_dataset(forecasts_anomalies) |> dplyr::collect()
ndvi <- arrow::open_dataset(ndvi_anomalies) |> dplyr::collect()
Expand Down Expand Up @@ -43,4 +65,4 @@ augment_data <- function(weather_anomalies, forecasts_anomalies,

return(augmented_data_directory)

}
}
31 changes: 16 additions & 15 deletions R/calculate_ndvi_anomalies.R
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,26 @@
calculate_ndvi_anomalies <- function(ndvi_date_lookup,
ndvi_historical_means,
ndvi_anomalies_directory,
model_dates_selected, lag_intervals,
model_dates_selected,
lag_intervals,
overwrite = FALSE,
...) {

# Set filename
date_selected <- model_dates_selected
save_filename <- glue::glue("ndvi_anomaly_{date_selected}.gz.parquet")
message(paste0("Calculating NDVI anomalies for ", date_selected))
ndvi_anomalies_filename <- file.path(ndvi_anomalies_directory, glue::glue("ndvi_anomaly_{date_selected}.gz.parquet"))

# Set up safe way to read parquet files
error_safe_read_parquet <- possibly(arrow::read_parquet, NULL)

# Check if file already exists
existing_files <- list.files(ndvi_anomalies_directory)
if(save_filename %in% existing_files & !overwrite) {
message("file already exists, skipping download")
return(file.path(ndvi_anomalies_directory, save_filename))
# Check if outbreak_history file exist and can be read and that we don't want to overwrite them.
if(!is.null(error_safe_read_parquet(ndvi_anomalies_filename)) & !overwrite) {
message(glue::glue("{basename(ndvi_anomalies_filename)} already exists and can be loaded, skipping download and processing."))
return(ndvi_anomalies_filename)
}

message(paste0("Calculating NDVI anomalies for ", date_selected))

# Get the lagged anomalies for selected dates, mapping over the lag intervals
lag_intervals_start <- c(1 , 1+lag_intervals[-length(lag_intervals)]) # 1 to start with previous day
lag_intervals_end <- lag_intervals # 30 days total including end day
Expand All @@ -67,7 +71,7 @@ calculate_ndvi_anomalies <- function(ndvi_date_lookup,
doy_end_frmt <- str_pad(doy_end, width = 3, side = "left", pad = "0")
doy_range <- glue::glue("{doy_start_frmt}_to_{doy_end_frmt}")

historical_means <- read_parquet(ndvi_historical_means[str_detect(ndvi_historical_means, doy_range)])
historical_means <- arrow::read_parquet(ndvi_historical_means[str_detect(ndvi_historical_means, doy_range)])
assertthat::assert_that(nrow(historical_means) > 0)

# get files and weights for the calculations
Expand All @@ -77,7 +81,7 @@ calculate_ndvi_anomalies <- function(ndvi_date_lookup,
filter(weight > 0) |>
select(start_date, filename, weight)

ndvi_dataset <- open_dataset(weights$filename)
ndvi_dataset <- arrow::open_dataset(weights$filename)

# Lag: calculate mean by pixel for the preceding x days
lagged_means <- ndvi_dataset |>
Expand All @@ -97,11 +101,8 @@ calculate_ndvi_anomalies <- function(ndvi_date_lookup,
relocate(date)

# Save as parquet
write_parquet(anomalies, here::here(ndvi_anomalies_directory, save_filename), compression = "gzip", compression_level = 5)

return(file.path(ndvi_anomalies_directory, save_filename))


arrow::write_parquet(anomalies, ndvi_anomalies_filename, compression = "gzip", compression_level = 5)

return(ndvi_anomalies_filename)
}

21 changes: 12 additions & 9 deletions R/calculate_ndvi_historical_means.R
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,19 @@ calculate_ndvi_historical_means <- function(ndvi_historical_means_directory,
doy_start_frmt <- str_pad(doy_start, width = 3, side = "left", pad = "0")
doy_end_frmt <- str_pad(doy_end, width = 3, side = "left", pad = "0")

save_filename <- glue::glue("historical_ndvi_mean_doy_{doy_start_frmt}_to_{doy_end_frmt}.gz.parquet")
message(paste("calculating historical ndvi means and standard deviations for doy", doy_start_frmt, "to", doy_end_frmt))
ndvi_historical_means_filename <- file.path(ndvi_historical_means_directory, glue::glue("historical_ndvi_mean_doy_{doy_start_frmt}_to_{doy_end_frmt}.gz.parquet"))

# Check if file already exists
existing_files <- list.files(ndvi_historical_means_directory)
if(save_filename %in% existing_files & !overwrite) {
message("file already exists, skipping download")
return(file.path(ndvi_historical_means_directory, save_filename))
# Set up safe way to read parquet files
error_safe_read_parquet <- possibly(arrow::read_parquet, NULL)

# Check if outbreak_history file exist and can be read and that we don't want to overwrite them.
if(!is.null(error_safe_read_parquet(ndvi_historical_means_filename)) & !overwrite) {
message(glue::glue("{basename(ndvi_historical_means_filename)} already exists and can be loaded, skipping download and processing."))
return(ndvi_historical_means_filename)
}

message(glue::glue("processing {ndvi_historical_means_filename}"))

# Get for relevant days of the year
doy_select <- yday(seq(dummy_date_start, dummy_date_end, by = "day"))

Expand Down Expand Up @@ -79,8 +82,8 @@ calculate_ndvi_historical_means <- function(ndvi_historical_means_directory,
historical_means <- left_join(historical_means, historical_sds)

# Save as parquet
write_parquet(historical_means, here::here(ndvi_historical_means_directory, save_filename), compression = "gzip", compression_level = 5)
write_parquet(historical_means, ndvi_historical_means_filename, compression = "gzip", compression_level = 5)

return(file.path(ndvi_historical_means_directory, save_filename))
return(ndvi_historical_means_filename)

}
19 changes: 19 additions & 0 deletions R/combine_anomolies.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
combine_anomolies <- function(weather_anomalies,
forecasts_anomalies,
ndvi_anomalies,
combined_anomolies_directory,
...) {

weather <- arrow::open_dataset(weather_anomalies)
forecasts <- arrow::open_dataset(forecasts_anomalies)
ndvi <- arrow::open_dataset(ndvi_anomalies)

ds <- arrow::open_dataset(c(weather_anomalies, forecasts_anomalies, ndvi_anomalies))

message("Save as parquets using hive partitioning by date")
ds |>
group_by(date) |>
arrow::write_dataset(combined_anomolies_directory, compression = "gzip", compression_level = 5)

return(list.files(combined_anomolies_directory, pattern = ".parquet", recursive = TRUE, full.names = TRUE))
}
Loading

0 comments on commit 5403084

Please sign in to comment.