Skip to content

Commit

Permalink
Fix LFS problem
Browse files Browse the repository at this point in the history
  • Loading branch information
n8layman committed Nov 18, 2024
1 parent b475f94 commit 06d2d8b
Show file tree
Hide file tree
Showing 38 changed files with 7,661 additions and 3,416 deletions.
Binary file modified .env
Binary file not shown.
177 changes: 85 additions & 92 deletions R/calculate_forecasts_anomalies.R
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#' Calculate and Save Anomalies from Forecast Data
#'
#' This function takes transformed ECMWF forecast and historical weather mean data,
#' calculates anomalies, and saves them in a specified directory. If the file already exists and `overwrite` is FALSE,
#' the existing file's path is returned. Otherwise, the existing file is overwritten.
#' This function takes transformed ECMWF forecast and historical weather mean data
#' and uses it to forecast weather anomolies for the remaineder of the current month,
#' and any months out present in the forecast. It then saves the forecast anomolies
#' in a specified directory.
#'
#' @author Emma Mendelsohn
#'
Expand All @@ -28,123 +29,115 @@
#' overwrite = TRUE)
#'
#' @export
calculate_forecasts_anomalies <- function(ecmwf_forecasts_transformed_directory,
calculate_forecasts_anomalies <- function(ecmwf_forecasts_transformed,
weather_historical_means,
forecasts_anomalies_directory,
model_dates_selected,
lead_intervals,
overwrite = FALSE,
...) {

# Check that we're only working on one date at a time
stopifnot(length(model_dates_selected) == 1)

# Set filename
date_selected <- model_dates_selected
save_filename <- glue::glue("forecast_anomaly_{date_selected}.gz.parquet")
message(paste0("Calculating forecast anomalies for ", date_selected))
save_filename <- file.path(forecasts_anomalies_directory, glue::glue("forecast_anomaly_{model_dates_selected}.gz.parquet"))
message(paste0("Calculating forecast anomalies for ", model_dates_selected))

# Check if file already exists and can be read
error_safe_read_parquet <- possibly(arrow::read_parquet, NULL)
error_safe_read_parquet <- possibly(arrow::open_dataset, NULL)

if(!is.null(error_safe_read_parquet(file.path(forecasts_validate_directory, save_filename))) & !overwrite) {
if(!is.null(error_safe_read_parquet(save_filename)) & !overwrite) {
message("file already exists and can be loaded, skipping download")
return(file.path(forecasts_anomalies_directory, save_filename))
return(save_filename)
}

# Open dataset to transformed data
forecasts_transformed_dataset <- open_dataset(ecmwf_forecasts_transformed_directory)
forecasts_transformed_dataset <- arrow::open_dataset(ecmwf_forecasts_transformed) |>
filter(base_date == floor_date(model_dates_selected, unit = "month"))

# Notes:

# Get the forecasts anomalies for selected dates, mapping over the lead intervals
lead_intervals_start <- c(0 , lead_intervals[-length(lead_intervals)]) # 0 to include current day in forecast
lead_intervals_end <- lead_intervals - 1 # -1 for 30 days total include start day
# 'Anomaly' is the scaled difference between the forecast mean and the historical mean.
# Values close to 0 mean the forecast temperature is near the historical mean.
# Values can be negative or positive. Weighting is used because the first day
# of the forecast will often fall part way through a month and so will the
# last day. In example to estimate the 30 day forecast anomaly starting on
# 3/20/2020 and ending on 30 days later on 4/18/2020 would have 12 days
# (including the first day, the 20th) in March 17 days (not including the
# last day) in April. The 30 day forecast should account for the relative
# contributions of March and April.

anomalies <- map(1:length(lead_intervals_start), function(i){

# subset to start and end day of interval
start <- lead_intervals_start[i]
end <- lead_intervals_end[i]

lead_start_date <- date_selected + start
lead_end_date <- date_selected + end

# lead months for subsetting
lead_months <- as.character(c(i, i+1))

# this is the date from which the forecasts are made
baseline_date <- floor_date(date_selected, unit = "month")

# calculate weights
weight_a <- as.integer(days_in_month(lead_start_date) - day(lead_start_date)) + 1 # include current date
weight_b <- day(lead_end_date) - 1
# An easier way to do this is to just make a list of every day from start to
# start + 30 - 1. Figure out the month and join to forecast month from
# ecmwf_forecasts_transformed. That way we could do all the things at once.
# Then group by x, y, and summarize average of data columns. Map over each
# lead interval and done. A benefit of this approach is that it makes
# comparing to historical means easy. Just find the historical means for each
# day then left join that in by month as well.

forecasts_transformed_dataset <- arrow::open_dataset(ecmwf_forecasts_transformed) |>
filter(base_date == floor_date(model_dates_selected, unit = "month")) |> collect()

historical_means <- arrow::open_dataset(weather_historical_means)

forecasts_anomalies <- map_dfr(1:length(lead_intervals), function(i) {

# get weighted mean of forecasts means
lead_means <- forecasts_transformed_dataset |>
filter(data_date == baseline_date) |>
filter(lead_month %in% lead_months) |>
mutate(weight = case_when(lead_month == !!lead_months[1] ~ !!weight_a,
lead_month == !!lead_months[2] ~ !!weight_b)) |>
group_by(x, y, short_name) |>
summarize(lead_mean = sum(mean * weight)/ sum(weight)) |>
ungroup()
lead_interval_start <- c(0, lead_intervals[-length(lead_intervals)])[i]
lead_interval_end <- lead_intervals[i]

# get historical means for lead period, removing doy 366
lead_dates <- seq(lead_start_date, lead_end_date, by = "day")
lead_doys <- yday(lead_dates)
if(366 %in% lead_doys) {
if(tail(lead_doys, 1) == 366){
lead_doys <- lead_doys[lead_doys!=366]
lead_doys <- c(lead_doys, 1)
}else{
lead_doys <- lead_doys[lead_doys!=366]
lead_doys <- c(lead_doys, tail(lead_doys, 1) + 1)
}
}
message(glue::glue("Processing forecast interval {lead_interval_start}-{lead_interval_end} days out"))

doy_start <- head(lead_doys, 1)
doy_end <- tail(lead_doys, 1)
doy_start_frmt <- str_pad(doy_start, width = 3, side = "left", pad = "0")
doy_end_frmt <- str_pad(doy_end, width = 3, side = "left", pad = "0")
doy_range <- glue::glue("{doy_start_frmt}_to_{doy_end_frmt}")
# Get a tibble of all the dates in the anomaly forecast range for the given lead interval
# and join in the historical weather mean data.
model_dates <- tibble(date = seq(from = model_dates_selected + lead_interval_start, to = model_dates_selected + lead_interval_end - 1, by = 1)) |>
mutate(doy = lubridate::yday(date), # Calculate day of year
month = month(date), # Extract month
year = year(date), # Extract year
lead_interval_start = lead_interval_start, # Store lead interval duration
lead_interval_end = lead_interval_end) # Store lead interval duration

historical_means <- read_parquet(weather_historical_means[str_detect(weather_historical_means, doy_range)])
assertthat::assert_that(nrow(historical_means) > 0)
# Join historical means based on day of year (doy)
model_dates <- model_dates |>
left_join(historical_means |> filter(doy >= min(model_dates$doy)) |> filter(doy < max(model_dates$doy)) |> collect(), by = c("doy"))

# calculate anomalies - a bit inefficient because arrow doesn't allow reshaping (should have done so in the transform function)
# NAs are expected because forecasts are for the whole continent, weather is just for areas of interest
# Join in forecast data based on x, y, month, and year.
# The historical data and forecast data _should_ have the same column
# names so differentiate with a suffix
model_dates <- model_dates |>
left_join(forecasts_transformed_dataset, by = c("x", "y", "month", "year"), suffix = c("_historical", "_forecast"))

temp_anomalies <- lead_means |>
filter(short_name == "2t") |>
left_join(historical_means |> select(x, y, contains("temperature")), by = c("x", "y")) |>
mutate(!!paste0("anomaly_temperature_forecast_", end) := lead_mean - historical_temperature_mean,
!!paste0("anomaly_temperature_scaled_forecast_", end) := (lead_mean - historical_temperature_mean)/historical_temperature_sd) |>
select(-short_name, -lead_mean, -starts_with("historical")) |>
filter(!is.na(!!sym(paste0("anomaly_temperature_forecast_", end))))
# Summarize by calculating the mean for each variable type (temperature, precipitation, relative_humidity)
# and across both historical data and forecast data over the days in the model_dates range
model_dates <- model_dates |>
group_by(x, y, lead_interval_start, lead_interval_end) |>
summarize(across(matches("temperature|precipitation|relative_humidity"), ~mean(.x)), .groups = "drop")

rh_anomalies <- lead_means |>
filter(short_name == "rh") |>
left_join(historical_means |> select(x, y, contains("humidity")), by = c("x", "y")) |>
mutate(!!paste0("anomaly_relative_humidity_forecast_", end) := lead_mean - historical_relative_humidity_mean,
!!paste0("anomaly_relative_humidity_scaled_forecast_", end) := (lead_mean - historical_relative_humidity_mean)/historical_relative_humidity_sd) |>
select(-short_name, -lead_mean, -starts_with("historical")) |>
filter(!is.na(!!sym(paste0("anomaly_relative_humidity_forecast_", end))))
# Calculate temperature anomalies
model_dates <- model_dates |>
mutate(anomaly_forecast_temperature = temperature_forecast - temperature_historical,
anomaly_forecast_scaled_temperature = anomaly_forecast_temperature / temperature_sd_historical)

precip_anomalies <- lead_means |>
filter(short_name == "tprate") |>
left_join(historical_means |> select(x, y, contains("precipitation")), by = c("x", "y")) |>
mutate(!!paste0("anomaly_precipitation_forecast_", end) := lead_mean - historical_precipitation_mean,
!!paste0("anomaly_precipitation_scaled_forecast_", end) := (lead_mean - historical_precipitation_mean)/historical_precipitation_sd) |>
select(-short_name, -lead_mean, -starts_with("historical")) |>
filter(!is.na(!!sym(paste0("anomaly_precipitation_forecast_", end))))
# Calculate precipitation anomalies
model_dates <- model_dates |>
mutate(anomaly_forecast_precipitation = precipitation_forecast - precipitation_historical,
anomaly_forecast_scaled_precipitation = anomaly_forecast_precipitation / precipitation_sd_historical)

left_join(temp_anomalies, rh_anomalies, by = c("x", "y")) |>
left_join(precip_anomalies, by = c("x", "y"))
# Calculate relative_humidity anomalies
model_dates <- model_dates |>
mutate(anomaly_forecast_relative_humidity = relative_humidity_forecast - relative_humidity_historical,
anomaly_forecast_scaled_relative_humidity = anomaly_forecast_relative_humidity / relative_humidity_sd_historical)

}) |>
reduce(left_join, by = c("x", "y")) |>
mutate(date = date_selected) |>
relocate(date)
# Clean up intermediate columns
model_dates |>
mutate(date = model_dates_selected) |>
select(x, y, date, 'lead_interval_start', 'lead_interval_end', starts_with("anomaly"))
})

# Save as parquet
write_parquet(anomalies, here::here(forecasts_anomalies_directory, save_filename), compression = "gzip", compression_level = 5)
# Write output to a parquet file
arrow::write_parquet(forecasts_anomalies, save_filename, compression = "gzip", compression_level = 5)

return(file.path(forecasts_anomalies_directory, save_filename))
return(save_filename)

}

2 changes: 1 addition & 1 deletion R/calculate_ndvi_anomalies.R
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ calculate_ndvi_anomalies <- function(ndvi_date_lookup,
ndvi_anomalies_filename <- file.path(ndvi_anomalies_directory, glue::glue("ndvi_anomaly_{date_selected}.gz.parquet"))

# Set up safe way to read parquet files
error_safe_read_parquet <- possibly(arrow::read_parquet, NULL)
error_safe_read_parquet <- possibly(arrow::open_dataset, NULL)

# Check if outbreak_history file exist and can be read and that we don't want to overwrite them.
if(!is.null(error_safe_read_parquet(ndvi_anomalies_filename)) & !overwrite) {
Expand Down
114 changes: 80 additions & 34 deletions R/calculate_ndvi_historical_means.R
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,55 @@
#' days_of_year = c(100:200), lag_intervals = c(30, 60, 90), overwrite = TRUE)
#'
#' @export
calculate_ndvi_historical_means <- function(ndvi_historical_means_directory,
ndvi_date_lookup,
days_of_year,
lag_intervals,
overwrite = FALSE,
calculate_weather_historical_means <- function(nasa_weather_transformed,
weather_historical_means_directory,
...) {

# Open dataset can handle multi-file datasets larger than can
# fit in memory
nasa_weather_data <- arrow::open_dataset(nasa_weather_transformed)

# Fast because we can avoid collecting until write_parquet
weather_historical_means <- map_vec(1:366, .progress = TRUE, function(i) {
filename <- file.path(weather_historical_means_directory,
glue::glue("weather_historical_mean_doy_{i}"))
nasa_weather_data |>
filter(doy == i) |>
group_by(x, y, doy) |>
summarize(across(matches("temperature|precipitation|humidity"), ~mean(.x)),
across(matches("temperature|precipitation|humidity"), ~sd(.x), .names = "{.col}_sd")) |>
arrow::write_parquet(filename)

filename
})

return(weather_historical_means)
}


calculate_ndvi_historical_means <- function(sentinel_ndvi_transformed,
modis_ndvi_transformed,
ndvi_historical_means_directory,
ndvi_historical_means_AWS,
...) {

ndvi_data <- arrow::open_dataset(c(sentinel_ndvi_transformed, modis_ndvi_transformed))

# Fast because we can avoid collecting until write_parquet
ndvu_historical_means <- map_vec(1:366, .progress = TRUE, function(i) {
filename <- file.path(ndvi_historical_means_directory,
glue::glue("ndvi_historical_mean_doy_{i}"))
ndvi_data |>
filter(doy == i) |>
group_by(x, y, doy) |>
summarize(across(matches("temperature|precipitation|humidity"), ~mean(.x)),
across(matches("temperature|precipitation|humidity"), ~sd(.x), .names = "{.col}_sd")) |>
arrow::write_parquet(filename)

filename
})

return(weather_historical_means)

interval_length <- unique(diff(lag_intervals))

Expand All @@ -36,54 +79,57 @@ calculate_ndvi_historical_means <- function(ndvi_historical_means_directory,
dummy_date_start <- ymd("20210101") + doy_start - 1
dummy_date_end <- dummy_date_start + interval_length - 1
doy_end <- yday(dummy_date_end)

doy_start_frmt <- str_pad(doy_start, width = 3, side = "left", pad = "0")
doy_end_frmt <- str_pad(doy_end, width = 3, side = "left", pad = "0")

ndvi_historical_means_filename <- file.path(ndvi_historical_means_directory, glue::glue("historical_ndvi_mean_doy_{doy_start_frmt}_to_{doy_end_frmt}.gz.parquet"))

# Set up safe way to read parquet files
error_safe_read_parquet <- possibly(arrow::read_parquet, NULL)
error_safe_read_parquet <- possibly(arrow::open_dataset, NULL)

# Check if outbreak_history file exist and can be read and that we don't want to overwrite them.
if(!is.null(error_safe_read_parquet(ndvi_historical_means_filename)) & !overwrite) {
message(glue::glue("{basename(ndvi_historical_means_filename)} already exists and can be loaded, skipping download and processing."))
return(ndvi_historical_means_filename)
}

message(glue::glue("processing {ndvi_historical_means_filename}"))

# Get for relevant days of the year
doy_select <- yday(seq(dummy_date_start, dummy_date_end, by = "day"))

# Get relevant NDVI files and weights for the calculations
weights <- ndvi_date_lookup |>
mutate(lag_doy = map(lookup_day_of_year, ~. %in% doy_select)) |>
mutate(weight = unlist(map(lag_doy, sum))) |>
filter(weight > 0) |>
weights <- ndvi_date_lookup |>
mutate(lag_doy = map(lookup_day_of_year, ~. %in% doy_select)) |>
mutate(weight = unlist(map(lag_doy, sum))) |>
filter(weight > 0) |>
select(start_date, filename, weight)
ndvi_dataset <- open_dataset(weights$filename) |>
left_join(weights |> select(-filename))

ndvi_dataset <- arrow::open_dataset(weights$filename) |>
left_join(weights |> select(-filename))

# Calculate weighted means
historical_means <- ndvi_dataset |>
group_by(x, y) |>
summarize(historical_ndvi_mean = sum(ndvi * weight)/ sum(weight)) |>
historical_means <- ndvi_dataset |>
group_by(x, y) |>
summarize(historical_ndvi_mean = sum(ndvi * weight)/ sum(weight)) |>
ungroup()

# Calculate weighted standard deviations, using weighted mean from previous step
historical_sds <- ndvi_dataset |>
left_join(historical_means) |>
group_by(x, y) |>
summarize(historical_ndvi_sd = sqrt(sum(weight * (ndvi - historical_ndvi_mean)^2) / (sum(weight)-1)) ) |>
historical_sds <- ndvi_dataset |>
left_join(historical_means) |>
group_by(x, y) |>
summarize(historical_ndvi_sd = sqrt(sum(weight * (ndvi - historical_ndvi_mean)^2) / (sum(weight)-1)) ) |>
ungroup()

historical_means <- left_join(historical_means, historical_sds)

# Save as parquet
historical_means <- left_join(historical_means, historical_sds) |>
mutate(doy_start = doy_start,
doy_end = doy_end)

# Save as parquet
write_parquet(historical_means, ndvi_historical_means_filename, compression = "gzip", compression_level = 5)

return(ndvi_historical_means_filename)

}

2 changes: 1 addition & 1 deletion R/calculate_weather_anomalies.R
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ calculate_weather_anomalies <- function(nasa_weather_transformed_directory,
}

# Open dataset to transformed data
weather_transformed_dataset <- open_dataset(nasa_weather_transformed_directory)
weather_transformed_dataset <- arrow::open_dataset(nasa_weather_transformed_directory)

# Get the lagged anomalies for selected dates, mapping over the lag intervals
lag_intervals_start <- c(1 , 1+lag_intervals[-length(lag_intervals)]) # 1 to start with previous day
Expand Down
Loading

0 comments on commit 06d2d8b

Please sign in to comment.