Skip to content

Commit

Permalink
Fix annoying dynamic branching / arrow open_dataset bug. For some rea…
Browse files Browse the repository at this point in the history
…son branching over ndvi_years caused problems when filtering datasets opened with arrow. No idea why. Fixed by removing dynamic branching from ndvi_transformed target.
  • Loading branch information
n8layman committed Dec 10, 2024
1 parent e6d3141 commit d136053
Show file tree
Hide file tree
Showing 9 changed files with 1,511 additions and 964 deletions.
Binary file modified .env
Binary file not shown.
7 changes: 5 additions & 2 deletions R/calculate_ndvi_anomalies.R
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ calculate_ndvi_anomalies <- function(ndvi_transformed,
ndvi_transformed_dataset <- arrow::open_dataset(ndvi_transformed) |>
filter(date == model_dates_selected)

model_date_doy <- lubridate::yday(model_dates_selected)

# Open dataset to historical ndvi data
historical_means <- arrow::open_dataset(ndvi_historical_means) |> filter(doy == lubridate::yday(model_dates_selected))
historical_means <- arrow::open_dataset(ndvi_historical_means) |> filter(doy == model_date_doy)

# Join the two datasets by day of year (doy)
ndvi_transformed_dataset <- left_join(ndvi_transformed_dataset, historical_means, by = c("x","y","doy"), suffix = c("", "_historical"))
Expand All @@ -66,7 +68,8 @@ calculate_ndvi_anomalies <- function(ndvi_transformed,
mutate(doy = as.integer(lubridate::yday(date)), # Calculate day of year
month = as.integer(lubridate::month(date)), # Extract month
year = as.integer(lubridate::year(date))) |> # Extract year
select(x, y, date, doy, month, year, starts_with("anomaly"))
select(x, y, date, doy, month, year, starts_with("anomaly")) |>
collect()

# Save as parquet
arrow::write_parquet(ndvi_transformed_dataset, save_filename, compression = "gzip", compression_level = 5)
Expand Down
48 changes: 32 additions & 16 deletions R/calculate_ndvi_historical_means.R
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,40 @@ calculate_ndvi_historical_means <- function(sentinel_ndvi_transformed,
ndvi_historical_means_directory,
ndvi_historical_means_AWS,
...) {

# Open dataset can handle multi-file datasets larger than can
# fit in memory
ndvi_data <- arrow::open_dataset(c(modis_ndvi_transformed, sentinel_ndvi_transformed))

# Fast because we can avoid collecting until write_parquet
ndvi_historical_means <- map_vec(1:366, .progress = TRUE, function(i) {

filename <- file.path(ndvi_historical_means_directory,
glue::glue("ndvi_historical_mean_doy_{i}.gz.parquet"))

# Open dataset can handle multi-file datasets larger than can
# fit in memory
ndvi_data <- arrow::open_dataset(c(sentinel_ndvi_transformed, modis_ndvi_transformed))
ndvi_data |>
filter(doy == i) |>
group_by(x, y, doy) |>
summarize(ndvi_sd = sd(ndvi, na.rm = T),
ndvi = mean(ndvi, na.rm = T),
.groups = "drop") |>
filter(!is.na(ndvi_sd)) |> # Drop constant values (ndvi_sd == NA)
arrow::write_parquet(filename, compression = "gzip", compression_level = 5)

# Check plot
# ggplot(ndvi_data, aes(x = x, y = y)) +
# geom_tile(aes(fill = ndvi), size = 5) + # Points colored by NDVI
# scale_fill_viridis_c() + # Gradient for NDVI values
# labs(
# title = glue::glue("Combined NDVI Historical Means, doy: {i}"),
# x = "Longitude",
# y = "Latitude",
# color = "NDVI"
# ) +
# theme_minimal()

# Fast because we can avoid collecting until write_parquet
ndvu_historical_means <- map_vec(1:366, .progress = TRUE, function(i) {
filename <- file.path(ndvi_historical_means_directory,
glue::glue("ndvi_historical_mean_doy_{i}.parquet"))
ndvi_data |>
filter(doy == i) |>
group_by(x, y, doy) |>
summarize(across(matches("ndvi"), ~mean(.x)),
across(matches("ndvi"), ~sd(.x), .names = "{.col}_sd")) |>
arrow::write_parquet(filename)

filename
filename
})

return(ndvu_historical_means)
return(ndvi_historical_means)
}
104 changes: 104 additions & 0 deletions R/file_partition_duckdb.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#' Partition files for DuckDB database
#'
#' This function is designed to partition files for the DuckDB database in R.
#' It takes in the sources of the files, path to store the partitioned files,
#' the template for file naming and the range of years and months.
#'
#' @author Nathan C. Layman
#'
#' @param sources The sources from where the files will be read.
#' @param path The directory where the partitioned files will be stored. Default is "data/explanatory_variables".
#' @param basename_template The template used to name the partitioned files. Default is "explanatory_variables_{.y}_{.m}".
#' @param years The years for which the files need to be partitioned. Default is 2007:2010.
#' @param months The months for which the files need to be partitioned. Default is 1:12.
#'
#' @return A string vector representing the filepath to each partitioned file.
#'
#' @note The function creates a connection with DuckDB database and loads file from each source. Then performs a join operation, next it partitions files
#' based on the selected years and months. The partitioned files are saved in Parquet format with the gzip codec.
#'
#' @examples
#' file_partition_duckdb(sources = list("source_path1","source_path2"),
#' path = 'data/explanatory_variables', basename_template = "explanatory_variables_{.y}_{.m}",
#' years = 2007: 2010, months = 1:12 )
#'
#' @export
file_partition_duckdb <- function(sources, # A named, nested list of parquet files
path = "data/explanatory_variables",
basename_template = "explanatory_variables_{.y}_{.m}.gz.parquet",
years = 2007:2010,
months = 1:12) {

# NCL change to branch off of model date for combo
# This approach does work. Only writing complete datasets
# 2005 doesn't have any outbreak history so what do we input?

file_partitions <- expand.grid(.y = years, .m = months)

files <- pmap_vec(file_partitions, function(.y, .m) {

# Create a connect to a DuckDB database
con <- duckdb::dbConnect(duckdb::duckdb())

# For each explanatory variable target create a table filtered appropriately
walk2(names(sources), sources, function(table_name, list_of_files) {

# Prepare the list of files
parquet_list <- glue::glue("SELECT * FROM '{list_of_files}'")

file_schemas <- map(list_of_files, ~arrow::open_dataset(.x)$schema)
unified_schema <- all(map_vec(file_schemas, ~.x == file_schemas[[1]]))

# Filter if the type is dynamic to reduce as much as possible the memory footprint
parquet_filter <- c()
if(!is.null(file_schemas[[1]]$year)) parquet_filter <- c(parquet_filter, paste("year ==", .y))
if(!is.null(file_schemas[[1]]$month)) parquet_filter <- c(parquet_filter, paste("month ==", .m))
if(length(parquet_filter)) {
parquet_filter <- paste("WHERE", paste(parquet_filter, collapse = " AND "))
} else {
parquet_filter = ""
}

parquet_list <- glue::glue("{parquet_list} {parquet_filter}")

# Check if all schemas are identical
if(unified_schema) {

# If all schema are identical: union all files
parquet_list <- paste(parquet_list, collapse = " UNION ALL ")

} else {

# If not: inner join all files
parquet_list <- glue::glue("({parquet_list})")
parquet_list <- glue::glue("{parquet_list} AS {tools::file_path_sans_ext(basename(list_of_files))}")
parquet_list <- paste0("SELECT * FROM ", paste(parquet_list, collapse = " NATURAL JOIN "))
}

# Set up query to add the table to the database
query <- glue::glue("CREATE OR REPLACE TABLE {table_name} AS {parquet_list}")

# Execute the query
add_table_result <- DBI::dbExecute(con, query)
message(glue::glue("{table_name} table created with {add_table_result} rows"))
})

# Establish a file name for the combination of month and year
filename <- file.path(path, glue::glue(basename_template))

# Set up a natural inner join for all the tables and output the result to file(s)
query <- glue::glue("COPY (SELECT * FROM {paste(names(sources), collapse = ' NATURAL JOIN ')}) TO '{filename}' (FORMAT 'parquet', CODEC 'gzip', ROW_GROUP_SIZE 100_000)")

# Execute the join
result <- DBI::dbExecute(con, query)
message(result)

# Clean up the database connection
duckdb::dbDisconnect(con)

# Return filename for the list
filename
})

files
}
9 changes: 7 additions & 2 deletions R/submit_modis_ndvi_bundle_request.R
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,17 @@ submit_modis_ndvi_bundle_request <- function(modis_ndvi_token,
}
message(" ")

bundle_response <- httr::GET(paste("https://appeears.earthdatacloud.nasa.gov/api/bundle/", task_id, sep = ""), httr::add_headers(Authorization = modis_ndvi_token))
bundle_response <- httr::GET(paste("https://appeears.earthdatacloud.nasa.gov/api/bundle/", task_id, sep = ""),
httr::add_headers(Authorization = modis_ndvi_token))

bundle_response <- jsonlite::fromJSON(jsonlite::toJSON(httr::content(bundle_response))) |>
bind_cols() |>
filter(file_type == "tif", grepl("NDVI", file_name)) |>
mutate(year_doy = sub(".*doy(\\d+).*", "\\1", file_name),
start_date = as.Date(year_doy, format = "%Y%j")) # confirmed this is start date through manual download test
start_date = as.Date(year_doy, format = "%Y%j"),
year = year(start_date)) |> # confirmed this is start date through manual download test
arrange(start_date) |>
filter(year == modis_ndvi_task_id_continent$year) # Ensure we're not getting stuff outside the requested year which might lead to dupes

# Return bundle response files
return(bundle_response)
Expand Down
23 changes: 13 additions & 10 deletions R/transform_ndvi.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,26 @@ transform_ndvi <- function(modis_ndvi_transformed,
# "data/sentinel_ndvi_transformed/transformed_sentinel_NDVI_2018-10-22_to_2018-11-01.gz.parquet"
# Has 2018-10-22 in both files.

ndvi_transformed_dataset <- arrow::open_dataset(c(sentinel_ndvi_transformed, modis_ndvi_transformed))

file_partitions <- expand.grid(.y = ndvi_years, .m = ndvi_months)

# Also we've got overlap between MODIS and sentinal data. For some dates 2018
# and 2019 i.e. there are ndvi values for each location and date from both
# sources. Current solution is to average them.
ndvi_transformed <- map2_vec(ndvi_years, ndvi_months, function(.y, .m) {
ndvi_transformed_files <- pmap_vec(file_partitions, function(.y, .m) {

ndvi_transformed_dataset <- arrow::open_dataset(c(sentinel_ndvi_transformed, modis_ndvi_transformed)) |>
ndvi_transformed_data <- ndvi_transformed_dataset |>
filter(year == .y,
month == .m) |>
select(-source) |>
group_by(x, y, date, doy, month, year) |>
summarize(ndvi = mean(ndvi), .groups = "drop")
summarize(ndvi = mean(ndvi), .groups = "drop") |>
collect()

# Set filename
save_filename <- file.path(ndvi_transformed_directory, glue::glue("ndvi_transformed_{.y}_{.m}.gz.parquet"))
message(paste("Combining ndvi sources for", .y, "month", .m))
message(paste("Combining ndvi sources for", .y, "month", .m, "with", nrow(ndvi_transformed_data), "rows"))

# Check if file already exists and can be read
error_safe_read_parquet <- possibly(arrow::open_dataset, NULL)
Expand All @@ -41,13 +46,11 @@ transform_ndvi <- function(modis_ndvi_transformed,
return(save_filename)
}

arrow::write_parquet(ndvi_transformed_dataset, save_filename, compression = "gzip", compression_level = 5)

rm(ndvi_transformed_dataset)

arrow::write_parquet(ndvi_transformed_data, save_filename, compression = "gzip", compression_level = 5)

return(save_filename)
})

ndvi_transformed
ndvi_transformed_files

}
}
67 changes: 39 additions & 28 deletions _targets.R
Original file line number Diff line number Diff line change
Expand Up @@ -443,10 +443,15 @@ dynamic_targets <- tar_plan(
# when we go to forecast. Right now we're just using step function
# interpolation where the NDVI value is constant for the entire 16-day period,
# then it steps up or down to the next interval's NDVI value.

# NCL:
tarchetypes::tar_group_size(name = modis_ndvi_requests,
size = 10,
command = modis_ndvi_bundle_request |>
arrange(start_date) |>
group_by(sha256) |> # Remove duplicate file requests
slice_max(created, n = 1) |>
ungroup() |>
mutate(end_date = lead(start_date) - days(1),
end_date = case_when(
is.na(end_date) ~ start_date + 15,
Expand Down Expand Up @@ -492,13 +497,16 @@ dynamic_targets <- tar_plan(
tar_target(ndvi_years, lubridate::year(modis_task_end_dates)),

# Combine modis and sentinel datasets into a single source for lagging
# There is some kind of bug between targets and arrow which interferes
# when branching over ndvi_years. I end up with empty parquet files
# I have no idea why pattern = map(ndvi_years) breaks things.
# Solution is to not use dynamic branching here.
tar_target(ndvi_transformed, transform_ndvi(modis_ndvi_transformed,
sentinel_ndvi_transformed,
ndvi_transformed_directory,
ndvi_years,
ndvi_months = 1:12,
overwrite = parse_flag(c("OVERWRITE_MODIS_NDVI", "OVERWRITE_SENTINEL_NDVI", "OVERWRITE_NDVI_TRANSFORMED"))),
pattern = map(ndvi_years),
format = "file",
repository = "local",
error = "null",
Expand Down Expand Up @@ -638,7 +646,8 @@ data_targets <- tar_plan(
weather_historical_means_directory,
weather_historical_means_AWS), # Enforce dependency
format = "file",
repository = "local"),
repository = "local",
cue = parse_flag("OVERWRITE_WEATHER_ANOMALIES", cue = T)),

# Next step put weather_historical_means files on AWS.
tar_target(weather_historical_means_AWS_upload, AWS_put_files(weather_historical_means,
Expand Down Expand Up @@ -733,14 +742,13 @@ data_targets <- tar_plan(
error = "null",
cue = tar_cue("always")), # Enforce dependency

# NCL NOTE: MAKE NDVI TRANSFORMED THAT DOES THE STEP INTERPOLATION AND COMBINES NDVI SOURCES
# FEED THAT INTO ANOMALIES. WE NEED IT SEPARATE TO DO LAGS.
tar_target(ndvi_historical_means, calculate_ndvi_historical_means(sentinel_ndvi_transformed,
modis_ndvi_transformed,
ndvi_historical_means_directory,
ndvi_historical_means_AWS), # Enforce dependency
format = "file",
repository = "local"),
repository = "local",
cue = parse_flag("OVERWRITE_NDVI_TRANSFORMED", cue = T)),


# Next step put ndvi_historical_means files on AWS.
Expand Down Expand Up @@ -847,32 +855,35 @@ data_targets <- tar_plan(

# Combine all static and dynamic data layers.
# Partition into separate parquet files by month and year.
tar_target(explanatory_variable_sources, tribble(
~type, ~name, ~list_of_files,
"static", "soil_preprocessed", soil_preprocessed,
"static", "aspect_preprocessed", aspect_preprocessed,
"static", "slope_preprocessed", slope_preprocessed,
"static", "glw_preprocessed", glw_preprocessed,
"static", "elevation_preprocessed", elevation_preprocessed,
"static", "bioclim_preprocessed", bioclim_preprocessed,
"static", "landcover_preprocessed", landcover_preprocessed,
"dynamic", "wahis_outbreak_history", wahis_outbreak_history,
"dynamic", "ecmwf_forecasts_transformed", ecmwf_forecasts_transformed,
"dynamic", "weather_anomalies", weather_anomalies,
"dynamic", "forecasts_anomalies", forecasts_anomalies,
"dynamic", "ndvi_anomalies", ndvi_anomalies,
# "dynamic", "ndvi_transformed_lagged", ndvi_transformed_lagged,
# "dynamic", "nasa_weather_transformed_lagged", nasa_weather_transformed_lagged
# "historical_mean", "weather_historical_means", weather_historical_means, # doy joins must come last
# "historical_mean", "ndvi_historical_means", ndvi_historical_means
# Why NO WAY to deparse substitute a list of variables?
tar_target(explanatory_variable_sources, list(
soil_preprocessed = soil_preprocessed,
aspect_preprocessed = aspect_preprocessed,
slope_preprocessed = slope_preprocessed,
glw_preprocessed = glw_preprocessed,
elevation_preprocessed = elevation_preprocessed,
bioclim_preprocessed = bioclim_preprocessed,
landcover_preprocessed = landcover_preprocessed,
wahis_outbreak_history = wahis_outbreak_history,
ecmwf_forecasts_transformed = ecmwf_forecasts_transformed,
weather_anomalies = weather_anomalies,
forecasts_anomalies = forecasts_anomalies,
ndvi_anomalies = ndvi_anomalies
# ndvi_transformed_lagged = ndvi_transformed_lagged,
# nasa_weather_transformed_lagged = nasa_weather_transformed_lagged
# weather_historical_means = weather_historical_means,
# ndvi_historical_means = ndvi_historical_means
)),

# Join all explanatory variable data sources using file based partitioning instead of hive
tar_target(explanatory_variables, join_data_sources(sources = explanatory_variable_sources,
path = explanatory_variables_directory,
basename_template = "explanatory_variable.parquet",
years = nasa_weather_years,
months = 1:12)),
# error needs to be null here because some predictors (like wahis_outbreak_sources) aren't
# present in all times.
tar_target(explanatory_variables, file_partition_duckdb(sources = explanatory_variable_sources,
path = explanatory_variables_directory,
years = nasa_weather_years,
months = 1:12),
format = "file",
repository = "local"),

# Next step put combined_anomalies files on AWS.
tar_target(explanatory_variables_AWS_upload, AWS_put_files(explanatory_variables,
Expand Down
Loading

0 comments on commit d136053

Please sign in to comment.