Skip to content

Commit

Permalink
Update data pipeline to work through with Mindy
Browse files Browse the repository at this point in the history
  • Loading branch information
n8layman committed Nov 20, 2024
1 parent 18097f1 commit 3e53640
Show file tree
Hide file tree
Showing 14 changed files with 4,306 additions and 1,825 deletions.
Binary file modified .env
Binary file not shown.
7 changes: 3 additions & 4 deletions R/AWS_get_folder.R
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,14 @@ AWS_put_files <- function(transformed_file_list,
# Remove the file from AWS if it's present in the folder and on AWS
# but not in the list of successfully transformed files. This file is
# not relevant to the pipeline
if(file %in% s3_files) {
if(file %in% basename(s3_files)) {

outcome <- c(outcome, glue::glue("Cleaning up dangling file {file} from AWS"))

# Remove the file from AWS
s3_download <- s3$delete_object(
s3_delete_receipt <- s3$delete_object(
Bucket = Sys.getenv("AWS_BUCKET_ID"),
Prefix = local_folder,
Key = file)
Key = file.path(local_folder, file))
}
}
}
Expand Down
20 changes: 9 additions & 11 deletions R/calculate_forecasts_anomalies.R
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ calculate_forecasts_anomalies <- function(ecmwf_forecasts_transformed,
return(save_filename)
}

# Open dataset to transformed data
forecasts_transformed_dataset <- arrow::open_dataset(ecmwf_forecasts_transformed) |>
filter(base_date == floor_date(model_dates_selected, unit = "month"))

# Notes:

# 'Anomaly' is the scaled difference between the forecast mean and the historical mean.
Expand All @@ -77,7 +73,7 @@ calculate_forecasts_anomalies <- function(ecmwf_forecasts_transformed,
# day then left join that in by month as well.

forecasts_transformed_dataset <- arrow::open_dataset(ecmwf_forecasts_transformed) |>
filter(base_date == floor_date(model_dates_selected, unit = "month")) |> collect()
filter(base_date == floor_date(model_dates_selected, unit = "month"))

historical_means <- arrow::open_dataset(weather_historical_means)

Expand All @@ -89,17 +85,16 @@ calculate_forecasts_anomalies <- function(ecmwf_forecasts_transformed,
message(glue::glue("Processing forecast interval {lead_interval_start}-{lead_interval_end} days out"))

# Get a tibble of all the dates in the anomaly forecast range for the given lead interval
# and join in the historical weather mean data.
model_dates <- tibble(date = seq(from = model_dates_selected + lead_interval_start, to = model_dates_selected + lead_interval_end - 1, by = 1)) |>
mutate(doy = lubridate::yday(date), # Calculate day of year
mutate(doy = as.integer(lubridate::yday(date)), # Calculate day of year
month = month(date), # Extract month
year = year(date), # Extract year
lead_interval_start = lead_interval_start, # Store lead interval duration
lead_interval_end = lead_interval_end) # Store lead interval duration
lead_interval_end = lead_interval_end) # Store lead interval duration

# Join historical means based on day of year (doy)
model_dates <- model_dates |>
left_join(historical_means |> filter(doy >= min(model_dates$doy)) |> filter(doy < max(model_dates$doy)) |> collect(), by = c("doy"))
model_dates <- historical_means |> filter(doy >= min(model_dates$doy)) |>
right_join(model_dates, by = c("doy"))

# Join in forecast data based on x, y, month, and year.
# The historical data and forecast data _should_ have the same column
Expand Down Expand Up @@ -131,12 +126,15 @@ calculate_forecasts_anomalies <- function(ecmwf_forecasts_transformed,
# Clean up intermediate columns
model_dates |>
mutate(date = model_dates_selected) |>
select(x, y, date, 'lead_interval_start', 'lead_interval_end', starts_with("anomaly"))
select(x, y, date, 'lead_interval_start', 'lead_interval_end', starts_with("anomaly")) |>
collect()
})

# Write output to a parquet file
arrow::write_parquet(forecasts_anomalies, save_filename, compression = "gzip", compression_level = 5)

rm(forecasts_anomalies)

return(save_filename)

}
Expand Down
218 changes: 132 additions & 86 deletions R/calculate_ndvi_anomalies.R
Original file line number Diff line number Diff line change
@@ -1,108 +1,154 @@
#' Calculate NDVI Anomalies
#'
#' This function calculates the NDVI (Normalized Difference Vegetation Index) anomalies for a specified date.
#' The anomalies are calculated by comparing lagged, weighted daily NDVI values to historical means and standard deviations for overlapping days of the year.
#' The function writes the anomalies into a GZ parquet file and returns its path.
#' This function calculates the NDVI anomalies using the transformed NDVI data from Sentinel and MODIS satellites,
#' historical NDVI means data, and dates provided for a selected model. The calculated anomalies are saved
#' as a parquet file in a designated directory.
#'
#' @author Emma Mendelsohn
#' @author Nathan Layman and Emma Mendelsohn
#'
#' @param ndvi_date_lookup A data frame containing the filenames for each day along with their respective dates for retrieval.
#' @param ndvi_historical_means A character vector of file paths to gz parquet files containing historical means for each grid cell and day of the year.
#' @param ndvi_anomalies_directory The directory to write the gz parquet file of anomalies.
#' @param model_dates_selected Model dates selected.
#' @param lag_intervals A numeric vector defining the start day for each lag period.
#' @param overwrite A boolean indicating whether existing files in the directory for the specified date should be overwritten.
#' @param ... Additional arguments not used by this function but included for generic function compatibility.
#' @param sentinel_ndvi_transformed Transformed Sentinel NDVI data
#' @param modis_ndvi_transformed Transformed MODIS NDVI data
#' @param ndvi_historical_means Historical NDVI means data
#' @param ndvi_anomalies_directory Directory where the calculated NDVI anomalies should be saved
#' @param model_dates_selected Dates for the selected model
#' @param overwrite Boolean flag indicating whether existing file of NDVI anomalies should be overwritten; Default is FALSE
#' @param ... Additional arguments not used by this function but included for generic function compatibility
#'
#' @return A string containing the path to the gz parquet file written by this function.
#' @return A string containing the filepath to the calculated NDVI anomalies parquet file
#'
#' @note If a file already exists in the directory for the specified date and 'overwrite' is 'FALSE', this function will return the existing file path without performing any calculations.
#' @note If the parquet file of anomalies already exists at the target filepath and overwrite is set to FALSE,
#' then, the existing file is returned without any calculations
#'
#' @examples
#' calculate_ndvi_anomalies(ndvi_date_lookup=my_lookup,
#' ndvi_historical_means=historical_means,
#' ndvi_anomalies_directory='./anomalies',
#' model_dates_selected=as.Date('2020-05-01'),
#' lag_intervals=c(1, 7, 14, 21, 30),
#' overwrite=TRUE)
#' calculate_ndvi_anomalies('sentinel_data_transformed', 'modis_data_transformed',
#' 'historical_means', '/directory_path/', 'model_date', overwrite = TRUE)
#'
#' @export
calculate_ndvi_anomalies <- function(ndvi_date_lookup,
#' @export>
calculate_ndvi_anomalies <- function(sentinel_ndvi_transformed,
modis_ndvi_transformed,
ndvi_historical_means,
ndvi_anomalies_directory,
model_dates_selected,
lag_intervals,
model_dates_selected,
overwrite = FALSE,
...) {

# Check that we're only working on one date at a time
stopifnot(length(model_dates_selected) == 1)

# Set filename
date_selected <- model_dates_selected
ndvi_anomalies_filename <- file.path(ndvi_anomalies_directory, glue::glue("ndvi_anomaly_{date_selected}.gz.parquet"))
save_filename <- file.path(ndvi_anomalies_directory, glue::glue("ndvi_anomaly_{model_dates_selected}.gz.parquet"))
message(paste0("Calculating ndvi anomalies for ", model_dates_selected))

# Set up safe way to read parquet files
# Check if file already exists and can be read
error_safe_read_parquet <- possibly(arrow::open_dataset, NULL)

# Check if outbreak_history file exist and can be read and that we don't want to overwrite them.
if(!is.null(error_safe_read_parquet(ndvi_anomalies_filename)) & !overwrite) {
message(glue::glue("{basename(ndvi_anomalies_filename)} already exists and can be loaded, skipping download and processing."))
return(ndvi_anomalies_filename)
if(!is.null(error_safe_read_parquet(save_filename)) & !overwrite) {
message("file already exists and can be loaded, skipping download")
return(save_filename)
}

message(paste0("Calculating NDVI anomalies for ", date_selected))
# Open dataset to transformed data
ndvi_transformed_dataset <- arrow::open_dataset(c(sentinel_ndvi_transformed, modis_ndvi_transformed)) |>
filter(date == model_dates_selected)

# Get the lagged anomalies for selected dates, mapping over the lag intervals
lag_intervals_start <- c(1 , 1+lag_intervals[-length(lag_intervals)]) # 1 to start with previous day
lag_intervals_end <- lag_intervals # 30 days total including end day
# Open dataset to historical ndvi data
historical_means <- arrow::open_dataset(ndvi_historical_means) |> filter(doy == lubridate::yday(model_dates_selected))

anomalies <- map2(lag_intervals_start, lag_intervals_end, function(start, end){

# get lag dates, removing doy 366
lag_dates <- seq(date_selected - end, date_selected - start, by = "day")
lag_doys <- yday(lag_dates)
if(366 %in% lag_doys){
lag_doys <- lag_doys[lag_doys!=366]
lag_doys <- c(head(lag_doys, 1) - 1, lag_doys)
}

# Get historical means for lag period
doy_start <- head(lag_doys, 1)
doy_end <- tail(lag_doys, 1)
doy_start_frmt <- str_pad(doy_start, width = 3, side = "left", pad = "0")
doy_end_frmt <- str_pad(doy_end, width = 3, side = "left", pad = "0")
doy_range <- glue::glue("{doy_start_frmt}_to_{doy_end_frmt}")

historical_means <- arrow::read_parquet(ndvi_historical_means[str_detect(ndvi_historical_means, doy_range)])
assertthat::assert_that(nrow(historical_means) > 0)

# get files and weights for the calculations
weights <- ndvi_date_lookup |>
mutate(lag_date = map(lookup_dates, ~. %in% lag_dates)) |>
mutate(weight = unlist(map(lag_date, sum))) |>
filter(weight > 0) |>
select(start_date, filename, weight)

ndvi_dataset <- arrow::open_dataset(weights$filename)

# Lag: calculate mean by pixel for the preceding x days
lagged_means <- ndvi_dataset |>
left_join(weights |> select(-filename)) |>
group_by(x, y) |>
summarize(lag_ndvi_mean = sum(ndvi * weight)/ sum(weight)) |>
ungroup()

# Join in historical means to calculate anomalies raw and scaled
full_join(lagged_means, historical_means, by = c("x", "y")) |>
mutate(!!paste0("anomaly_ndvi_", end) := lag_ndvi_mean - historical_ndvi_mean,
!!paste0("anomaly_ndvi_scaled_", end) := (lag_ndvi_mean - historical_ndvi_mean)/historical_ndvi_sd) |>
select(-starts_with("lag"), -starts_with("historical"))
}) |>
reduce(left_join, by = c("x", "y")) |>
mutate(date = date_selected) |>
relocate(date)

# Save as parquet
arrow::write_parquet(anomalies, ndvi_anomalies_filename, compression = "gzip", compression_level = 5)

return(ndvi_anomalies_filename)
}

# Join the two datasets by day of year (doy)
ndvi_transformed_dataset <- left_join(ndvi_transformed_dataset, historical_means, by = c("x","y","doy"), suffix = c("", "_historical"))

# Calculate ndvi anomalies
ndvi_transformed_dataset <- ndvi_transformed_dataset |>
mutate(anomaly_ndvi = ndvi - ndvi_historical,
anomaly_scaled_ndvi = anomaly_ndvi / ndvi_sd)

# Remove intermediate columns
ndvi_transformed_dataset <- ndvi_transformed_dataset |>
select(x, y, date, starts_with("anomaly"))

# Save as parquet
arrow::write_parquet(ndvi_transformed_dataset, save_filename, compression = "gzip", compression_level = 5)

return(save_filename)
}



# calculate_ndvi_anomalies <- function(ndvi_date_lookup,
# ndvi_historical_means,
# ndvi_anomalies_directory,
# model_dates_selected,
# lag_intervals,
# overwrite = FALSE,
# ...) {
# # Set filename
# date_selected <- model_dates_selected
# ndvi_anomalies_filename <- file.path(ndvi_anomalies_directory, glue::glue("ndvi_anomaly_{date_selected}.gz.parquet"))
#
# # Set up safe way to read parquet files
# error_safe_read_parquet <- possibly(arrow::open_dataset, NULL)
#
# # Check if outbreak_history file exist and can be read and that we don't want to overwrite them.
# if(!is.null(error_safe_read_parquet(ndvi_anomalies_filename)) & !overwrite) {
# message(glue::glue("{basename(ndvi_anomalies_filename)} already exists and can be loaded, skipping download and processing."))
# return(ndvi_anomalies_filename)
# }
#
# message(paste0("Calculating NDVI anomalies for ", date_selected))
#
# # Get the lagged anomalies for selected dates, mapping over the lag intervals
# lag_intervals_start <- c(1 , 1+lag_intervals[-length(lag_intervals)]) # 1 to start with previous day
# lag_intervals_end <- lag_intervals # 30 days total including end day
#
# anomalies <- map2(lag_intervals_start, lag_intervals_end, function(start, end){
#
# # get lag dates, removing doy 366
# lag_dates <- seq(date_selected - end, date_selected - start, by = "day")
# lag_doys <- yday(lag_dates)
# if(366 %in% lag_doys){
# lag_doys <- lag_doys[lag_doys!=366]
# lag_doys <- c(head(lag_doys, 1) - 1, lag_doys)
# }
#
# # Get historical means for lag period
# doy_start <- head(lag_doys, 1)
# doy_end <- tail(lag_doys, 1)
# doy_start_frmt <- str_pad(doy_start, width = 3, side = "left", pad = "0")
# doy_end_frmt <- str_pad(doy_end, width = 3, side = "left", pad = "0")
# doy_range <- glue::glue("{doy_start_frmt}_to_{doy_end_frmt}")
#
# historical_means <- arrow::read_parquet(ndvi_historical_means[str_detect(ndvi_historical_means, doy_range)])
# assertthat::assert_that(nrow(historical_means) > 0)
#
# # get files and weights for the calculations
# weights <- ndvi_date_lookup |>
# mutate(lag_date = map(lookup_dates, ~. %in% lag_dates)) |>
# mutate(weight = unlist(map(lag_date, sum))) |>
# filter(weight > 0) |>
# select(start_date, filename, weight)
#
# ndvi_dataset <- arrow::open_dataset(weights$filename)
#
# # Lag: calculate mean by pixel for the preceding x days
# lagged_means <- ndvi_dataset |>
# left_join(weights |> select(-filename)) |>
# group_by(x, y) |>
# summarize(lag_ndvi_mean = sum(ndvi * weight)/ sum(weight)) |>
# ungroup()
#
# # Join in historical means to calculate anomalies raw and scaled
# full_join(lagged_means, historical_means, by = c("x", "y")) |>
# mutate(!!paste0("anomaly_ndvi_", end) := lag_ndvi_mean - historical_ndvi_mean,
# !!paste0("anomaly_ndvi_scaled_", end) := (lag_ndvi_mean - historical_ndvi_mean)/historical_ndvi_sd) |>
# select(-starts_with("lag"), -starts_with("historical"))
# }) |>
# reduce(left_join, by = c("x", "y")) |>
# mutate(date = date_selected) |>
# relocate(date)
#
# # Save as parquet
# arrow::write_parquet(anomalies, ndvi_anomalies_filename, compression = "gzip", compression_level = 5)
#
# return(ndvi_anomalies_filename)
# }
#
Loading

0 comments on commit 3e53640

Please sign in to comment.