Skip to content

Commit

Permalink
Work on ecmwf join issues
Browse files Browse the repository at this point in the history
  • Loading branch information
n8layman committed Dec 10, 2024
1 parent 1b97073 commit 426c793
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 348 deletions.
Binary file modified .env
Binary file not shown.
133 changes: 72 additions & 61 deletions R/file_partition_duckdb.R
Original file line number Diff line number Diff line change
Expand Up @@ -23,82 +23,93 @@
#' years = 2007: 2010, months = 1:12 )
#'
#' @export
file_partition_duckdb <- function(sources, # A named, nested list of parquet files
path = "data/explanatory_variables",
basename_template = "explanatory_variables_{.y}_{.m}.gz.parquet",
years = 2007:2010,
months = 1:12) {
file_partition_duckdb <- function(explanatory_variable_sources, # A named, nested list of parquet files
explanatory_variables_directory = "data/explanatory_variables",
model_dates_selected,
basename_template = "explanatory_variables_{model_dates_selected}.gz.parquet",
overwrite = FALSE) {

# NCL change to branch off of model date for combo
# This approach does work. Only writing complete datasets
# 2005 doesn't have any outbreak history so what do we input?
# Next step is lagged data.
# JOINING ON model_dates_selected means going back and changing 'base_date' to 'date' in ecmwf_transformed and anomaly

file_partitions <- expand.grid(.y = years, .m = months)
# Check that we're only working on one date at a time
stopifnot(length(model_dates_selected) == 1)

files <- pmap_vec(file_partitions, function(.y, .m) {
# Set filename
save_filename <- file.path(explanatory_variables_directory, glue::glue(basename_template))
message(paste0("Combining explanatory variables for ", model_dates_selected))

# Check if file already exists and can be read
error_safe_read_parquet <- possibly(arrow::open_dataset, NULL)

if(!is.null(error_safe_read_parquet(save_filename)) & !overwrite) {
message("file already exists and can be loaded, skipping download")
return(save_filename)
}

# Create a connect to a DuckDB database
con <- duckdb::dbConnect(duckdb::duckdb())

# For each explanatory variable target create a table filtered appropriately
walk2(names(explanatory_variable_sources), explanatory_variable_sources, function(table_name, list_of_files) {

# Prepare the list of files
parquet_list <- glue::glue("SELECT * FROM '{list_of_files}'")

# Create a connect to a DuckDB database
con <- duckdb::dbConnect(duckdb::duckdb())
file_schemas <- map(list_of_files, ~arrow::open_dataset(.x)$schema)
unified_schema <- all(map_vec(file_schemas, ~.x == file_schemas[[1]]))

# For each explanatory variable target create a table filtered appropriately
walk2(names(sources), sources, function(table_name, list_of_files) {

# Prepare the list of files
parquet_list <- glue::glue("SELECT * FROM '{list_of_files}'")

file_schemas <- map(list_of_files, ~arrow::open_dataset(.x)$schema)
unified_schema <- all(map_vec(file_schemas, ~.x == file_schemas[[1]]))
parquet_filter <- c()
if(!is.null(file_schemas[[1]]$year)) parquet_filter <- c(parquet_filter, paste("year ==", year(model_dates_selected)))
if(!is.null(file_schemas[[1]]$month)) parquet_filter <- c(parquet_filter, paste("month ==", month(model_dates_selected)))
if(length(parquet_filter)) {
parquet_filter <- paste("WHERE", paste(parquet_filter, collapse = " AND "))
} else {
parquet_filter = ""
}

parquet_list <- glue::glue("{parquet_list} {parquet_filter}")

# Filter if the type is dynamic to reduce as much as possible the memory footprint
parquet_filter <- c()
if(!is.null(file_schemas[[1]]$year)) parquet_filter <- c(parquet_filter, paste("year ==", .y))
if(!is.null(file_schemas[[1]]$month)) parquet_filter <- c(parquet_filter, paste("month ==", .m))
if(length(parquet_filter)) {
parquet_filter <- paste("WHERE", paste(parquet_filter, collapse = " AND "))
} else {
parquet_filter = ""
}
if(!is.null(file_schemas[[1]]$year)) parquet_list <- glue::glue("{parquet_list} WHERE year = {year(model_dates_selected)}")

# Check if all schemas are identical
if(unified_schema) {

parquet_list <- glue::glue("{parquet_list} {parquet_filter}")
# If all schema are identical: union all files
parquet_list <- paste(parquet_list, collapse = " UNION ALL ")

# Check if all schemas are identical
if(unified_schema) {

# If all schema are identical: union all files
parquet_list <- paste(parquet_list, collapse = " UNION ALL ")

} else {

# If not: inner join all files
parquet_list <- glue::glue("({parquet_list})")
parquet_list <- glue::glue("{parquet_list} AS {tools::file_path_sans_ext(basename(list_of_files))}")
parquet_list <- paste0("SELECT * FROM ", paste(parquet_list, collapse = " NATURAL JOIN "))
}
} else {

# Set up query to add the table to the database
query <- glue::glue("CREATE OR REPLACE TABLE {table_name} AS {parquet_list}")

# Execute the query
add_table_result <- DBI::dbExecute(con, query)
message(glue::glue("{table_name} table created with {add_table_result} rows"))
})

# Establish a file name for the combination of month and year
filename <- file.path(path, glue::glue(basename_template))

# Set up a natural inner join for all the tables and output the result to file(s)
query <- glue::glue("COPY (SELECT * FROM {paste(names(sources), collapse = ' NATURAL JOIN ')}) TO '{filename}' (FORMAT 'parquet', CODEC 'gzip', ROW_GROUP_SIZE 100_000)")

# Execute the join
result <- DBI::dbExecute(con, query)
message(result)
# If not: inner join all files
parquet_list <- glue::glue("({parquet_list})")
parquet_list <- glue::glue("{parquet_list} AS {tools::file_path_sans_ext(basename(list_of_files))}")
parquet_list <- paste0("SELECT * FROM ", paste(parquet_list, collapse = " NATURAL JOIN "))
}

# Clean up the database connection
duckdb::dbDisconnect(con)
# Set up query to add the table to the database
query <- glue::glue("CREATE OR REPLACE TABLE {table_name} AS {parquet_list}")

# Return filename for the list
filename
})
# Execute the query
add_table_result <- DBI::dbExecute(con, query)
message(glue::glue("{table_name} table created with {add_table_result} rows"))
})

# Set up a natural inner join for all the tables and output the result to file(s)
query <- glue::glue("COPY (SELECT * FROM {paste(names(explanatory_variable_sources), collapse = ' NATURAL JOIN ')}) TO '{save_filename}' (FORMAT 'parquet', CODEC 'gzip', ROW_GROUP_SIZE 100_000)")

# Execute the join
result <- DBI::dbExecute(con, query)
message(result)

# Clean up the database connection
duckdb::dbDisconnect(con)

# Return filename for the list
save_filename

files
}
19 changes: 10 additions & 9 deletions R/transform_ecmwf_forecasts.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#' @author Nathan Layman, Emma Mendelsohn
#'
#' @param ecmwf_forecasts_api_parameters A list containing the parameters for the ECMWF API request such as year, month, variables, etc.
#' @param local_folder Character. The path to the local folder where transformed files will be saved. Defaults to `ecmwf_forecasts_transformed_directory`.
#' @param ecmwf_forecasts_transformed_directory Character. The path to the local folder where transformed files will be saved. Defaults to `ecmwf_forecasts_transformed_directory`.
#' @param continent_raster_template The path to the raster file used as a template for continent-level spatial alignment.
#' @param overwrite A boolean flag indicating whether to overwrite existing processed files. Default is FALSE.
#' @param ... Further arguments not used by the function but included for compatibility.
Expand All @@ -21,7 +21,7 @@
#'
#' #' @author Nathan Layman, Emma Mendelsohn
transform_ecmwf_forecasts <- function(ecmwf_forecasts_api_parameters,
local_folder,
ecmwf_forecasts_transformed_directory,
continent_raster_template,
overwrite = FALSE,
...) {
Expand All @@ -33,7 +33,7 @@ transform_ecmwf_forecasts <- function(ecmwf_forecasts_api_parameters,
year <- ecmwf_forecasts_api_parameters$year
month <- unlist(ecmwf_forecasts_api_parameters$month)

transformed_file <- file.path(local_folder, glue::glue("ecmwf_seasonal_forecast_{month}_{year}.gz.parquet"))
transformed_file <- file.path(ecmwf_forecasts_transformed_directory, glue::glue("ecmwf_seasonal_forecast_{month}_{year}.gz.parquet"))

# Check if transformed file already exists and can be loaded.
# If so return file name and path **unless it's the current year**
Expand All @@ -56,7 +56,7 @@ transform_ecmwf_forecasts <- function(ecmwf_forecasts_api_parameters,
raw_files <- expand.grid(product_type = unlist(ecmwf_forecasts_api_parameters$product_types),
variable = unlist(ecmwf_forecasts_api_parameters$variables)) |>
rowwise() |>
mutate(raw_file = file.path(local_folder, glue::glue("ecmwf_seasonal_forecast_{month}_{year}_{product_type}_{variable}.grib")))
mutate(raw_file = file.path(ecmwf_forecasts_transformed_directory, glue::glue("ecmwf_seasonal_forecast_{month}_{year}_{product_type}_{variable}.grib")))

# Restrict to one product type
raw_files <- raw_files |> filter(str_detect(product_type, "mean"))
Expand Down Expand Up @@ -93,7 +93,7 @@ transform_ecmwf_forecasts <- function(ecmwf_forecasts_api_parameters,
.progress = TRUE,
~ecmwfr::wf_request(request = .x,
user = Sys.getenv("ECMWF_USERID"),
path = local_folder,
path = ecmwf_forecasts_transformed_directory,
verbose = T))

# Verify that terra can open all the saved grib files. If not error out to try again next time
Expand Down Expand Up @@ -129,12 +129,13 @@ transform_ecmwf_forecasts <- function(ecmwf_forecasts_api_parameters,
# 2. Fix total_precipitation metadata and convert units from m/second to mm/day.
# Note the variable name is total_precipitation but it is really *mean total precipitation rate*
# 3. Correct precip sd
grib_data <- grib_data |> mutate(mean = ifelse(units == "C", mean - 273.15, ((mean > 0) * 8.64e+7 * mean)),
grib_data <- grib_data |> mutate(date = base_date,
mean = ifelse(units == "C", mean - 273.15, ((mean > 0) * 8.64e+7 * mean)),
sd = ifelse(units == "", ((sd > 0) * 8.64e+7 * sd), sd),
var_id = ifelse(units == "", "tprate", var_id),
units = ifelse(units == "", "mm/day", units),
month = as.integer(lubridate::month(base_date)), # Base month
year = as.integer(lubridate::year(base_date)), # Base year
month = as.integer(lubridate::month(date)), # Base month
year = as.integer(lubridate::year(date)), # Base year
lead_month = as.integer(lubridate::month(forecast_end_date - 1)),
lead_year = as.integer(lubridate::year(forecast_end_date - 1)),
variable = fct_recode(variable,
Expand All @@ -144,7 +145,7 @@ transform_ecmwf_forecasts <- function(ecmwf_forecasts_api_parameters,

# Calculate relative humidity from temperature and dewpoint temperature
grib_data <- grib_data |>
select(x, y, base_date, month, year, lead_month, lead_year, mean, sd, variable) |>
select(x, y, date, month, year, lead_month, lead_year, mean, sd, variable) |>
pivot_wider(names_from = variable, values_from = c(mean, sd), names_glue = "{variable}_{.value}") |> # Reshape to make it easier to calculate composite values like relative humidity
mutate(

Expand Down
13 changes: 7 additions & 6 deletions _targets.R
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ dynamic_targets <- tar_plan(
# TODO: NAs outside of the continent
tar_target(ecmwf_forecasts_transformed,
transform_ecmwf_forecasts(ecmwf_forecasts_api_parameters,
local_folder = ecmwf_forecasts_transformed_directory,
ecmwf_forecasts_transformed_directory,
continent_raster_template,
overwrite = parse_flag("OVERWRITE_ECMWF_FORECASTS"),
get_ecmwf_forecasts_AWS), # Enforce Dependency
Expand Down Expand Up @@ -845,7 +845,7 @@ data_targets <- tar_plan(
error = "null",
cue = tar_cue("always")), # Enforce dependency


# Notes: This data will be aggregated. Mean monthly anomaly lagged 1, 2, 3 months back
# Current date, lagged ndvi and weather anomalies, current ndvi and weather anomalies, forecast anomaly
# over forecast interval. Forecast interval. Response is number of outbreaks (or cases?) over forecast interval
Expand Down Expand Up @@ -878,10 +878,11 @@ data_targets <- tar_plan(
# Join all explanatory variable data sources using file based partitioning instead of hive
# error needs to be null here because some predictors (like wahis_outbreak_sources) aren't
# present in all times.
tar_target(explanatory_variables, file_partition_duckdb(sources = explanatory_variable_sources,
path = explanatory_variables_directory,
years = nasa_weather_years,
months = 1:12),
tar_target(explanatory_variables, file_partition_duckdb(explanatory_variable_sources,
explanatory_variables_directory,
model_dates_selected,
overwrite = parse_flag("OVERWRITE_EXPLANATORY_VARIABLES")),
pattern = map(model_dates_selected),
format = "file",
repository = "local"),

Expand Down
Loading

0 comments on commit 426c793

Please sign in to comment.