Skip to content

Commit

Permalink
Fix bug in forecast anomaly target
Browse files Browse the repository at this point in the history
  • Loading branch information
n8layman committed Dec 2, 2024
1 parent dbb23b2 commit e01c590
Show file tree
Hide file tree
Showing 13 changed files with 2,454 additions and 1,568 deletions.
Binary file modified .env
Binary file not shown.
84 changes: 44 additions & 40 deletions R/calculate_forecasts_anomalies.R
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ calculate_forecasts_anomalies <- function(ecmwf_forecasts_transformed,
weather_historical_means,
forecasts_anomalies_directory,
model_dates_selected,
lead_intervals,
forecast_intervals,
overwrite = FALSE,
...) {

Expand All @@ -42,8 +42,7 @@ calculate_forecasts_anomalies <- function(ecmwf_forecasts_transformed,

# Set filename
save_filename <- file.path(forecasts_anomalies_directory, glue::glue("forecast_anomaly_{model_dates_selected}.gz.parquet"))
message(paste0("Calculating forecast anomalies for ", model_dates_selected))


# Check if file already exists and can be read
error_safe_read_parquet <- possibly(arrow::open_dataset, NULL)

Expand Down Expand Up @@ -75,54 +74,63 @@ calculate_forecasts_anomalies <- function(ecmwf_forecasts_transformed,
# Updated notes after discussion with Noam
# We want current date, current ndvi, forecast amount in days, forecast weather, forecast outbreak history (check this), and all the static layers. So we don't want wide weather forecast but long.

# Get a tibble of all the dates in the anomaly forecast range for the given lead interval
model_dates <- tibble(date = seq(from = model_dates_selected + lead_interval_start,
to = model_dates_selected + lead_interval_end - 1, by = 1)) |>
mutate(doy = as.integer(lubridate::yday(date)), # Calculate day of year
month = as.integer(lubridate::month(date)), # Extract month
year = as.integer(lubridate::year(date))) # Extract year

# Get the relevant forecast data. From the most recent base_date that came
# before the model_date selected.
forecasts_transformed_dataset <- arrow::open_dataset(ecmwf_forecasts_transformed) |> filter(base_date <= model_dates_selected)
forecasts_transformed_dataset <- arrow::open_dataset(ecmwf_forecasts_transformed) |>
filter(base_date <= model_dates_selected)

relevant_base_date <- forecasts_transformed_dataset |>
select(base_date) |>
distinct() |>
arrange(base_date) |>
pull(base_date, as_vector = TRUE)
forecasts_transformed_dataset <- forecasts_transformed_dataset |> filter(base_date == relevant_base_date)
arrange(desc(base_date)) |>
pull(base_date, as_vector = TRUE) |>
head(n = 1)

forecasts_transformed_dataset <- forecasts_transformed_dataset |>
filter(base_date == relevant_base_date) |>
select(-base_date, -month, -year) |>
mutate(month = lead_month, year = lead_year)

historical_means <- arrow::open_dataset(weather_historical_means)
historical_means <- arrow::open_dataset(weather_historical_means)

forecasts_anomalies <- map_dfr(1:length(lead_intervals), function(i) {
forecasts_anomalies <- map_dfr(1:(length(forecast_intervals) - 1), function(i) {

lead_interval_start <- c(0, lead_intervals[-length(lead_intervals)])[i]
lead_interval_end <- lead_intervals[i]
lead_interval_start <- forecast_intervals[i]
lead_interval_end <- forecast_intervals[i+1]

message(glue::glue("Processing forecast interval {lead_interval_start}-{lead_interval_end} days out"))
message(glue::glue("Calculating ECMWF anomalies on {model_dates_selected} for {lead_interval_start}-{lead_interval_end} day forecast"))

# Get a tibble of all the dates in the anomaly forecast range for the given lead interval
forecast_anomaly <- model_dates |>
mutate(lead_interval_start = lead_interval_start, # Store lead interval duration
lead_interval_end = lead_interval_end) # Store lead interval duration
forecast_anomaly <- tibble(date = seq(from = model_dates_selected + lead_interval_start,
to = model_dates_selected + lead_interval_end - 1, by = 1)) |>
mutate(doy = as.integer(lubridate::yday(date)), # Calculate day of year
month = as.integer(lubridate::month(date)), # Extract month
year = as.integer(lubridate::year(date)),
lead_interval_start = lead_interval_start, # Store lead interval duration
lead_interval_end = lead_interval_end) # Extract year

# Much faster to do the joins within an arrow context
forecast_anomaly_file <- tempfile()
arrow::write_parquet(forecast_anomaly, forecast_anomaly_file)
forecast_anomaly <- arrow::open_dataset(forecast_anomaly_file)

# Join historical means based on day of year (doy)
forecast_anomaly <- historical_means |> filter(doy >= min(model_dates$doy)) |>
right_join(model_dates, by = c("doy")) |> relocate(-matches("precipitation|temperature|humidity"))
forecast_anomaly <- forecast_anomaly |>
left_join(historical_means, by = "doy") |>
relocate(-matches("precipitation|temperature|humidity"))

# Join in forecast data based on x, y, month, and year.
# The historical data and forecast data _should_ have the same column
# names so differentiate with a suffix
forecast_anomaly <- forecast_anomaly |>
left_join(forecasts_transformed_dataset, by = c("x", "y", "month", "year"), suffix = c("_historical", "_forecast"))

left_join(forecasts_transformed_dataset, by = c("x", "y", "month", "year"),
suffix = c("_historical", "_forecast"))

# Summarize by calculating the mean for each variable type (temperature, precipitation, relative_humidity)
# and across both historical data and forecast data over the days in the model_dates range
forecast_anomaly <- forecast_anomaly |>
group_by(x, y, lead_interval_start, lead_interval_end) |>
summarize(across(matches("temperature|precipitation|relative_humidity"), ~mean(.x)), .groups = "drop")

summarize(across(matches("temperature|precipitation|relative_humidity"), ~mean(.x, na.rm = T)), .groups = "drop")

# Calculate temperature anomalies
forecast_anomaly <- forecast_anomaly |>
Expand All @@ -140,21 +148,17 @@ calculate_forecasts_anomalies <- function(ecmwf_forecasts_transformed,
anomaly_forecast_scaled_relative_humidity = anomaly_forecast_relative_humidity / relative_humidity_sd_historical)

# Clean up intermediate columns
forecast_anomaly |>
mutate(date = model_dates_selected) |>
select(x, y, date, doy, month, year, 'lead_interval_start', 'lead_interval_end', starts_with("anomaly")) |>
# Regenerate month and year
forecast_anomaly <- forecast_anomaly |>
mutate(date = model_dates_selected,
doy = lubridate::ymd(date),
month = lubridate::month(date),
year = lubridate::year(date),
forecast_interval = lead_interval_end) |>
select(x, y, date, doy, month, year, forecast_interval, starts_with("anomaly")) |>
collect()
})

# Change forecast lead and start to just forecast weather column and forecast days. Include zero here. I don't think NASA weather is necessary then.
forecasts_anomalies <- forecasts_anomalies |>
select(-lead_interval_start) # |>
# pivot_wider(
# names_from = lead_interval_end,
# values_from = contains("anomaly"),
# names_sep = "_"
# )

# Write output to a parquet file
arrow::write_parquet(forecasts_anomalies, save_filename, compression = "gzip", compression_level = 5)

Expand Down
5 changes: 2 additions & 3 deletions R/calculate_ndvi_anomalies.R
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
#' 'historical_means', '/directory_path/', 'model_date', overwrite = TRUE)
#'
#' @export>
calculate_ndvi_anomalies <- function(sentinel_ndvi_transformed,
modis_ndvi_transformed,
calculate_ndvi_anomalies <- function(ndvi_transformed,
ndvi_historical_means,
ndvi_anomalies_directory,
model_dates_selected,
Expand All @@ -48,7 +47,7 @@ calculate_ndvi_anomalies <- function(sentinel_ndvi_transformed,
}

# Open dataset to transformed data
ndvi_transformed_dataset <- arrow::open_dataset(c(sentinel_ndvi_transformed, modis_ndvi_transformed)) |>
ndvi_transformed_dataset <- arrow::open_dataset(ndvi_transformed) |>
filter(date == model_dates_selected)

# Open dataset to historical ndvi data
Expand Down
30 changes: 30 additions & 0 deletions R/lag_data.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
lag_data <- function(data_files,
lag_intervals,
model_dates_selected,
overwrite = TRUE,
...) {

# The goal of this is to figure out the average of the data column over the interval
# Find dates at start and end interval back from date
# Group by x, y, start_interval, end_interval, and take the mean don't forget na.rm = T

data <- arrow::open_dataset(data_files)

lagged_data <- map_dfr(1:(length(lag_intervals) - 1), function(i) {

start_date = model_dates_selected - days(lag_intervals[i])
end_date = model_dates_selected - days(lag_intervals[i+1] - 1)

data |> filter(date >= end_date, date <= start_date) |>
collect() |>
select(-source) |>
group_by(x, y, date, doy, month, year) |>
summarize(across(everything(), ~mean(.x, na.rm = T))) |>
mutate(lag_interval = lag_intervals[i])

select(-doy, -month, -year) |> mutate(date = dplyr::lag(date, lag_interval)) |> rename_with(~ paste(.x, "lag", lag_interval, sep = "_"), contains("ndvi")) |> drop_na(date)
}) |> reduce(left_join, by = c("x", "y", "date")) |>
drop_na() |>
rename_with(~gsub("_0", "", .x), contains("_0"))

}
8 changes: 4 additions & 4 deletions R/set_ecmwf_api_parameter.R
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#' @param bbox_coords The bounding box coordinates for the location for which the data are to be retrieved.
#' @param variables The variables to retrieve from the API; default are '2m_dewpoint_temperature', '2m_temperature', and 'total_precipitation'.
#' @param product_types The type of data product to retrieve; default are 'monthly_mean', 'monthly_maximum', 'monthly_minimum', 'monthly_standard_deviation'.
#' @param leadtime_months The lead times in months for which the forecasts are made; default are '1', '2', '3', '4', '5', '6'.
#' @param lead_months The lead times in months for which the forecasts are made; default are '1', '2', '3', '4', '5', '6'.
#'
#' @return A tibble containing the set parameters.
#'
Expand All @@ -21,13 +21,13 @@
#' bbox_coords = c(50.8503, 4.3517),
#' variables = c("2m_temperature", "total_precipitation"),
#' product_types = c("monthly_mean", "monthly_maximum"),
#' leadtime_months = c("1", "2", "3"))
#' lead_months = c("1", "2", "3"))
#'
set_ecmwf_api_parameter <- function(start_year = 2005,
bbox_coords = continent_bounding_box,
variables = c("2m_dewpoint_temperature", "2m_temperature", "total_precipitation"),
product_types = c("monthly_mean", "monthly_maximum", "monthly_minimum", "monthly_standard_deviation"),
leadtime_months = c("1", "2", "3", "4", "5", "6")) {
lead_months = seq(1,6)) {


# API details from:
Expand Down Expand Up @@ -84,5 +84,5 @@ set_ecmwf_api_parameter <- function(start_year = 2005,
mutate(spatial_bounds = list(spatial_bounds)) |>
mutate(variables = list(variables)) |>
mutate(product_types = list(product_types)) |>
mutate(leadtime_months = list(leadtime_months))
mutate(leadtime_months = list(as.character(lead_months)))
}
5 changes: 2 additions & 3 deletions R/set_model_dates.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,17 @@
#' @param start_year The first year in the range of dates.
#' @param end_year The final year in the range of dates.
#' @param n_per_month The number of dates per month that should be generated.
#' @param lag_intervals The intervals at which the dates should lag.
#' @param seed The seed to be used in the random number generator. This is used to ensure reproducibility in the sequence of dates.
#'
#' @return A sequence of dates to be used in a model, based on the provided parameters.
#'
#' @note This function generates a sequence of dates using 'seq', 'sample', and 'lubridate::days_in_month'. The seed ensures reproducibility. The sequence of dates can be lagged by intervals and includes a desired number of dates per month between the start and end years.
#'
#' @examples
#' set_model_dates(start_year = 2000, end_year = 2002, n_per_month = 30, lag_intervals = 1, seed = 42)
#' set_model_dates(start_year = 2000, end_year = 2002, n_per_month = 30, seed = 42)
#'
#' @export
set_model_dates <- function(start_year, end_year, n_per_month, lag_intervals, seed = 123) {
set_model_dates <- function(start_year, end_year, n_per_month, seed = 123) {

# Create a vector of dates from January of start year to December of end year with n_per_month random days
# drawn from each month in the sequence.
Expand Down
6 changes: 3 additions & 3 deletions R/transform_ecmwf_forecasts.R
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ transform_ecmwf_forecasts <- function(ecmwf_forecasts_api_parameters,
dataset_short_name = "seasonal-monthly-single-levels",
target = basename(raw_file)
)

})

ecmwfr::wf_set_key(user = Sys.getenv("ECMWF_USERID"), key = Sys.getenv("ECMWF_TOKEN"))
Expand Down Expand Up @@ -113,7 +112,7 @@ transform_ecmwf_forecasts <- function(ecmwf_forecasts_api_parameters,
# Here mean refers to taking the average of the values within a grid cell.
# The mean operation performed within extract_grib_data averages across the forecast ensemble
grib_data <- pmap_dfr(raw_files, function(product_type, variable, raw_file) {
extract_grib_data(raw_file, continent_raster_template) |> mutate(variable = variable)
extract_grib_data(raw_file, template = continent_raster_template) |> mutate(variable = variable)
})

message(glue::glue("ecmwf_seasonal_forecast_{month}_{year} metadata successfully read."))
Expand All @@ -137,14 +136,15 @@ transform_ecmwf_forecasts <- function(ecmwf_forecasts_api_parameters,
month = as.integer(lubridate::month(base_date)), # Base month
year = as.integer(lubridate::year(base_date)), # Base year
lead_month = as.integer(lubridate::month(forecast_end_date - 1)),
lead_year = as.integer(lubridate::year(forecast_end_date - 1)),
variable = fct_recode(variable,
dewpoint = "2m_dewpoint_temperature",
temperature = "2m_temperature",
precipitation = "total_precipitation"))

# Calculate relative humidity from temperature and dewpoint temperature
grib_data <- grib_data |>
select(x, y, base_date, month, year, lead_months, mean, sd, variable) |>
select(x, y, base_date, month, year, lead_month, lead_year, mean, sd, variable) |>
pivot_wider(names_from = variable, values_from = c(mean, sd), names_glue = "{variable}_{.value}") |> # Reshape to make it easier to calculate composite values like relative humidity
mutate(

Expand Down
3 changes: 2 additions & 1 deletion R/transform_modis_ndvi.R
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ transform_modis_ndvi <- function(modis_ndvi_token,
doy = as.integer(lubridate::yday(date)),
month = as.integer(lubridate::month(date)),
year = as.integer(lubridate::year(date)),
source = "modis")
source = "modis") |>
select(-step)

# Save transformed rast as parquet
arrow::write_parquet(transformed_raster, transformed_file, compression = "gzip", compression_level = 5)
Expand Down
36 changes: 36 additions & 0 deletions R/transform_ndvi.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
transform_ndvi <- function(modis_ndvi_transformed,
sentinel_ndvi_transformed,
ndvi_transformed_directory,
overwrite = FALSE,
...) {

# ndvi_transformed_dataset <- arrow::open_dataset(c(sentinel_ndvi_transformed, modis_ndvi_transformed))
#
# years <- ndvi_transformed_dataset |> select(year) |> distinct() |> arrange(year) |> pull(year, as_vector = T)
#
# ndvi_transformed <- map(years, function(yr) {
#
# ndvi_transformed_dataset <- arrow::open_dataset(c(sentinel_ndvi_transformed, modis_ndvi_transformed)) |> filter(year == yr)
#
# # Set filename
# save_filename <- file.path(ndvi_transformed_directory, glue::glue("ndvi_transformed_{yr}.gz.parquet"))
# message(paste0("Combining ndvi sources for ", yr))
#
# # Check if file already exists and can be read
# error_safe_read_parquet <- possibly(arrow::open_dataset, NULL)
#
# if(!is.null(error_safe_read_parquet(save_filename)) & !overwrite) {
# message("file already exists and can be loaded, skipping")
# return(save_filename)
# }
#
# arrow::write_parquet(ndvi_transformed_dataset |> filter(year == yr), save_filename)
#
# rm(ndvi_transformed_dataset)
#
# return(save_filename)
# })
#
# ndvi_transformed

}
Loading

0 comments on commit e01c590

Please sign in to comment.