Skip to content

Commit

Permalink
Lots of bug fixes to the data ingest pipeline. Only peice remaining i…
Browse files Browse the repository at this point in the history
…s to double check the forecast anomoly target to ensure proper joining
  • Loading branch information
n8layman committed Nov 24, 2024
1 parent aebac53 commit dbb23b2
Show file tree
Hide file tree
Showing 23 changed files with 3,497 additions and 801 deletions.
Binary file modified .env
Binary file not shown.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ data/**/*.tif
data/elevation/*
data/slope_aspect/*
data/soil/*

notes/*
*.token

96 changes: 40 additions & 56 deletions R/AWS_get_folder.R
Original file line number Diff line number Diff line change
@@ -1,31 +1,21 @@
#' Download files from an AWS S3 bucket to a local folder
#' AWS S3 Bucket File Retrieval
#'
#' This function downloads files from a specified S3 bucket and prefix to a local folder.
#' It only downloads files that are not already present in the local folder.
#' Additionally, it ensures that AWS credentials and region are set in the environment.
#' The function retrieves files stored in an AWS S3 bucket to a local folder and removes faulty parquet files.
#'
#' @author Nathan Layman
#' @author Nathan C. Layman
#'
#' @param local_folder Character. The path to the local folder where files should be downloaded and the AWS prefix
#' @param ...
#' @param local_folder A string representing the local folder where the files will be downloaded
#' @param ... additional arguments not used by this function, included for generic function compatibility
#'
#' @return A list of files downloaded from AWS
#'
#' @note
#' The AWS environment variables `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_REGION`, and `AWS_BUCKET_ID`
#' must be set correctly in order to access the S3 bucket. If any of these are missing, the function will stop with an error.
#' Files in the S3 bucket will be deleted if they cannot be successfully read as parquet files.
#' @return A vector of strings representing the paths of the downloaded files. If no files are found, NULL is returned.
#'
#' @note This function requires the AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_REGION environment variables to be set.
#' Access AWS S3 bucket contents, download files locally, clean faulty data, and return the list of downloaded files.
#'
#' @examples
#' \dontrun{
#' # Ensure the AWS environment variables are set in your system or .env file:
#' # AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION, and AWS_BUCKET_ID
#' # Ensure to set AWS environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION)
#' AWS_get_folder(local_folder = "./data")
#'
#' # Download files from an S3 bucket folder to a local directory
#' downloaded_files <- AWS_get_folder("my/local/folder")
#' }
#'
#' @export
AWS_get_folder <- function(local_folder, ...) {

Expand Down Expand Up @@ -108,32 +98,29 @@ AWS_get_folder <- function(local_folder, ...) {
}


#' Upload and Sync Files with AWS S3
#' Upload Transformed Files to AWS
#'
#' This function synchronizes a local folder with an AWS S3 bucket. It checks for AWS credentials,
#' lists existing files in the S3 bucket, and compares them with the local files to upload new files
#' or remove files that are no longer needed.
#'
#' @author Nathan Layman
#' This function uploads transformed files to AWS given a list of files and a target local folder. It supports continuation token for handling large quantities of files. It also checks for existing files on AWS and in local folder before upload, providing informative messages about the uploading process.
#'
#' @param transformed_file_list A character vector of file paths that should be present on AWS S3.
#' @param local_folder A character string specifying the path to the local folder to be synced with AWS S3.
#' @author Nathan C. Layman
#'
#' @examples
#' \dontrun{
#' AWS_put_files(transformed_file_list = c("file1.parquet", "file2.parquet"),
#' local_folder = "path/to/local/folder")
#' }
#' @param transformed_file_list A character vector of filenames that have been transformed and are to be uploaded to AWS S3. Filenames should be base names, not full paths.
#' @param local_folder A character string indicating the local directory that contains the transformed files to be uploaded to AWS S3.
#' @param ... Additional arguments not used by this function.
#'
#' @return A character vector of messages indicating the outcomes of trying to upload each file in the transformed_file_list to AWS.
#'
#' @note The function uses AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_REGION environment variables to access AWS. These should be set prior to running this function.
#'
#' @return A list of actions taken
#' @examples
#' AWS_put_files(transformed_file_list = c("file1.csv", "file2.csv"),
#' local_folder = "./transformed_data")
#'
#' @export
AWS_put_files <- function(transformed_file_list,
local_folder,
...) {

transformed_file_list <- basename(transformed_file_list |> unlist())

# Check if AWS credentials and region are set in the environment
if (any(Sys.getenv(c("AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_REGION")) == "")) {
msg <- paste(
Expand Down Expand Up @@ -166,47 +153,44 @@ AWS_put_files <- function(transformed_file_list,
}

# Get files in local folder
local_folder_files <- list.files(path = local_folder, recursive = TRUE)
local_folder_files <- list.files(path = local_folder, recursive = TRUE, full.names = T)

# Collect outcomes
outcome <- c()
outcomes <- c()

# Walk through local_folder_files
for(file in local_folder_files) {

# Is the file in the transformed_file_list?
if(file %in% transformed_file_list) {

# Is the file already on AWS?
if(file %in% s3_files) {

outcome <- c(outcome, glue::glue("{file} already present on AWS"))

} else {

outcome <- c(outcome, glue::glue("Uploading {file} to AWS"))

# Put the file on S3
s3_upload <- s3$put_object(
Body = file.path(local_folder, file),
Bucket = Sys.getenv("AWS_BUCKET_ID"),
Key = file.path(local_folder, file))
}
# Put the file on S3
s3_upload <- s3$put_object(
Body = file,
Bucket = Sys.getenv("AWS_BUCKET_ID"),
Key = file)

outcome <- glue::glue("Uploading {file} to AWS")

} else {

# Remove the file from AWS if it's present in the folder and on AWS
# but not in the list of successfully transformed files. This file is
# not relevant to the pipeline
if(file.path(local_folder, file) %in% s3_files) {
if(file %in% s3_files) {

outcome <- c(outcome, glue::glue("Cleaning up dangling file {file} from AWS"))
outcome <- glue::glue("Cleaning up dangling file {file} from AWS")

# Remove the file from AWS
s3_delete_receipt <- s3$delete_object(
Bucket = Sys.getenv("AWS_BUCKET_ID"),
Bucket = Sys.getenv("AWS_BUCKET_ID"),
Key = file.path(local_folder, file))
} else {
next
}
}
message(outcome)
outcomes <- c(outcomes, outcome)
}

outcome
Expand Down
59 changes: 42 additions & 17 deletions R/calculate_forecasts_anomalies.R
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,32 @@ calculate_forecasts_anomalies <- function(ecmwf_forecasts_transformed,
# contributions of March and April.

# An easier way to do this is to just make a list of every day from start to
# start + 30 - 1. Figure out the month and join to forecast month from
# start + 30 - 1. Figure out the year nad month and join to forecast month from
# ecmwf_forecasts_transformed. That way we could do all the things at once.
# Then group by x, y, and summarize average of data columns. Map over each
# lead interval and done. A benefit of this approach is that it makes
# comparing to historical means easy. Just find the historical means for each
# day then left join that in by month as well.

forecasts_transformed_dataset <- arrow::open_dataset(ecmwf_forecasts_transformed) |>
filter(base_date == floor_date(model_dates_selected, unit = "month"))
# Updated notes after discussion with Noam
# We want current date, current ndvi, forecast amount in days, forecast weather, forecast outbreak history (check this), and all the static layers. So we don't want wide weather forecast but long.

# Get a tibble of all the dates in the anomaly forecast range for the given lead interval
model_dates <- tibble(date = seq(from = model_dates_selected + lead_interval_start,
to = model_dates_selected + lead_interval_end - 1, by = 1)) |>
mutate(doy = as.integer(lubridate::yday(date)), # Calculate day of year
month = as.integer(lubridate::month(date)), # Extract month
year = as.integer(lubridate::year(date))) # Extract year

# Get the relevant forecast data. From the most recent base_date that came
# before the model_date selected.
forecasts_transformed_dataset <- arrow::open_dataset(ecmwf_forecasts_transformed) |> filter(base_date <= model_dates_selected)
relevant_base_date <- forecasts_transformed_dataset |>
select(base_date) |>
distinct() |>
arrange(base_date) |>
pull(base_date, as_vector = TRUE)
forecasts_transformed_dataset <- forecasts_transformed_dataset |> filter(base_date == relevant_base_date)

historical_means <- arrow::open_dataset(weather_historical_means)

Expand All @@ -85,51 +102,59 @@ calculate_forecasts_anomalies <- function(ecmwf_forecasts_transformed,
message(glue::glue("Processing forecast interval {lead_interval_start}-{lead_interval_end} days out"))

# Get a tibble of all the dates in the anomaly forecast range for the given lead interval
model_dates <- tibble(date = seq(from = model_dates_selected + lead_interval_start, to = model_dates_selected + lead_interval_end - 1, by = 1)) |>
mutate(doy = as.integer(lubridate::yday(date)), # Calculate day of year
month = month(date), # Extract month
year = year(date), # Extract year
lead_interval_start = lead_interval_start, # Store lead interval duration
forecast_anomaly <- model_dates |>
mutate(lead_interval_start = lead_interval_start, # Store lead interval duration
lead_interval_end = lead_interval_end) # Store lead interval duration

# Join historical means based on day of year (doy)
model_dates <- historical_means |> filter(doy >= min(model_dates$doy)) |>
right_join(model_dates, by = c("doy"))
forecast_anomaly <- historical_means |> filter(doy >= min(model_dates$doy)) |>
right_join(model_dates, by = c("doy")) |> relocate(-matches("precipitation|temperature|humidity"))

# Join in forecast data based on x, y, month, and year.
# The historical data and forecast data _should_ have the same column
# names so differentiate with a suffix
model_dates <- model_dates |>
forecast_anomaly <- forecast_anomaly |>
left_join(forecasts_transformed_dataset, by = c("x", "y", "month", "year"), suffix = c("_historical", "_forecast"))


# Summarize by calculating the mean for each variable type (temperature, precipitation, relative_humidity)
# and across both historical data and forecast data over the days in the model_dates range
model_dates <- model_dates |>
forecast_anomaly <- forecast_anomaly |>
group_by(x, y, lead_interval_start, lead_interval_end) |>
summarize(across(matches("temperature|precipitation|relative_humidity"), ~mean(.x)), .groups = "drop")


# Calculate temperature anomalies
model_dates <- model_dates |>
forecast_anomaly <- forecast_anomaly |>
mutate(anomaly_forecast_temperature = temperature_forecast - temperature_historical,
anomaly_forecast_scaled_temperature = anomaly_forecast_temperature / temperature_sd_historical)

# Calculate precipitation anomalies
model_dates <- model_dates |>
forecast_anomaly <- forecast_anomaly |>
mutate(anomaly_forecast_precipitation = precipitation_forecast - precipitation_historical,
anomaly_forecast_scaled_precipitation = anomaly_forecast_precipitation / precipitation_sd_historical)

# Calculate relative_humidity anomalies
model_dates <- model_dates |>
forecast_anomaly <- forecast_anomaly |>
mutate(anomaly_forecast_relative_humidity = relative_humidity_forecast - relative_humidity_historical,
anomaly_forecast_scaled_relative_humidity = anomaly_forecast_relative_humidity / relative_humidity_sd_historical)

# Clean up intermediate columns
model_dates |>
forecast_anomaly |>
mutate(date = model_dates_selected) |>
select(x, y, date, 'lead_interval_start', 'lead_interval_end', starts_with("anomaly")) |>
select(x, y, date, doy, month, year, 'lead_interval_start', 'lead_interval_end', starts_with("anomaly")) |>
collect()
})

# Change forecast lead and start to just forecast weather column and forecast days. Include zero here. I don't think NASA weather is necessary then.
forecasts_anomalies <- forecasts_anomalies |>
select(-lead_interval_start) # |>
# pivot_wider(
# names_from = lead_interval_end,
# values_from = contains("anomaly"),
# names_sep = "_"
# )

# Write output to a parquet file
arrow::write_parquet(forecasts_anomalies, save_filename, compression = "gzip", compression_level = 5)

Expand Down
5 changes: 4 additions & 1 deletion R/calculate_ndvi_anomalies.R
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ calculate_ndvi_anomalies <- function(sentinel_ndvi_transformed,

# Remove intermediate columns
ndvi_transformed_dataset <- ndvi_transformed_dataset |>
select(x, y, date, starts_with("anomaly"))
mutate(doy = as.integer(lubridate::yday(date)), # Calculate day of year
month = as.integer(lubridate::month(date)), # Extract month
year = as.integer(lubridate::year(date))) |> # Extract year
select(x, y, date, doy, month, year, starts_with("anomaly"))

# Save as parquet
arrow::write_parquet(ndvi_transformed_dataset, save_filename, compression = "gzip", compression_level = 5)
Expand Down
5 changes: 4 additions & 1 deletion R/calculate_weather_anomalies.R
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ calculate_weather_anomalies <- function(nasa_weather_transformed,

# Remove intermediate columns
weather_transformed_dataset <- weather_transformed_dataset |>
select(x, y, date, starts_with("anomaly"))
mutate(doy = as.integer(lubridate::yday(date)), # Calculate day of year
month = as.integer(lubridate::month(date)), # Extract month
year = as.integer(lubridate::year(date))) |> # Extract year
select(x, y, date, doy, month, year, starts_with("anomaly"))

# Save as parquet
arrow::write_parquet(weather_transformed_dataset, save_filename, compression = "gzip", compression_level = 5)
Expand Down
36 changes: 25 additions & 11 deletions R/download_sentinel_ndvi.R
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,40 @@
#'
#' @export
download_sentinel_ndvi <- function(sentinel_ndvi_api_parameters,
raw_filename) {
raw_filename,
sentinel_ndvi_token_file = "sentinel.token") {

product_id <- sentinel_ndvi_api_parameters$id
message(paste0("Downloading ", raw_filename))

auth <- httr::POST("https://identity.dataspace.copernicus.eu/auth/realms/CDSE/protocol/openid-connect/token",
body = list(
grant_type = "password",
username = Sys.getenv("COPERNICUS_USERNAME"),
password = Sys.getenv("COPERNICUS_PASSWORD"),
client_id = "cdse-public"),
encode = "form")
# Read in sentinel token
sentinel_ndvi_token <- readLines(sentinel_ndvi_token_file)

url <- glue::glue('https://zipper.dataspace.copernicus.eu/odata/v1/Products({product_id})/$value')

response <- httr::GET(url, httr::add_headers(Authorization = paste("Bearer", httr::content(auth)$access_token)),
httr::write_disk(raw_filename, overwrite = TRUE))
i <- 0
response <- list()
response$status_code <- 401

while(response$status_code != 200 && i < 6) {
response <- httr::GET(url, httr::add_headers(Authorization = paste("Bearer", sentinel_ndvi_token)),
httr::write_disk(raw_filename, overwrite = TRUE))
httr::message_for_status(response)
if(response$status_code == 401) {
get_sentinel_ndvi_token(filename = sentinel_ndvi_token_file)
sentinel_ndvi_token <- readLines(sentinel_ndvi_token_file)
}
Sys.sleep(ifelse(2^i>60, 60, 2^i))
}

httr::stop_for_status(response)

# Remove old nc file if it exists
file.remove(paste0(tools::file_path_sans_ext(raw_filename), ".nc"))
# Construct the file path of the .nc file
nc_file <- paste0(tools::file_path_sans_ext(raw_filename), ".nc")
if (file.exists(nc_file)) {
file.remove(nc_file)
}

# Unzip the new download and rename
unzip(raw_filename,
Expand Down
31 changes: 15 additions & 16 deletions R/get_outbreak_distance_matrix.R
Original file line number Diff line number Diff line change
@@ -1,27 +1,26 @@
#' Calculate Outbreak Distance Matrix
#' Compute the Outbreak Distance Matrix
#'
#' This function calculates a distance matrix for outbreaks based on given parameters. The function takes
#' outbreak data, a raster template, a distance in kilometers, and a beta distribution values as arguments.
#' It then calculates the distance values, applies conditions, and returns a matrix of distances.
#' This function computes the geographical distance between outbreak locations and
#' raster grid cells, transforming these distances into a connectivity matrix.
#'
#' @author Nathan C. Layman
#'
#' @param wahis_outbreaks Dataframe containing outbreak data.
#' @param wahis_raster_template A raster template for structuring the data.
#' @param within_km The maximum distance in kilometers to be considered for the matrix. Default is 500.
#' @param beta_dist A value defining beta distribution. Default is 0.01.
#' @param wahis_outbreaks A dataframe of outbreak locations with columns 'longitude' and 'latitude'.
#' @param wahis_raster_template A raster template on which the function calculates distances.
#' @param within_km Numeric, the radius within which to calculate distances. Defaults to 500.
#' @param beta_dist Numeric, a coefficient used to transform distances into a connectivity matrix. Defaults to 0.01.
#'
#' @return A matrix containing calculated distance values based on outbreak data.
#' @return A matrix with rows representing grid cells and columns representing outbreak locations,
#' where each cell's value is a transformed measure of its distance from the outbreak.
#'
#' @note This function calculates distance values based on vincenty measure, applies conditions, and returns a
#' matrix of distances. If the distance is larger than a given threshold, it is set as NA. Distance values are then adjusted
#' based on a beta distribution, and NA values are set as 0.
#' @note Distances are calculated with the Vincenty measure,
#' transformed using an exponential decay function, and outside of the specified radius are set to zero.
#'
#' @examples
#' get_outbreak_distance_matrix(wahis_outbreaks = outbreak_data,
#' get_outbreak_distance_matrix(wahis_outbreaks = wahis_df,
#' wahis_raster_template = raster_template,
#' within_km = 100,
#' beta_dist = 0.05)
#' within_km = 500,
#' beta_dist = 0.01)
#'
#' @export
get_outbreak_distance_matrix <- function(wahis_outbreaks, wahis_raster_template, within_km = 500, beta_dist = 0.01) {
Expand All @@ -46,4 +45,4 @@ get_outbreak_distance_matrix <- function(wahis_outbreaks, wahis_raster_template,
dist_mat[is.na(dist_mat)] <- 0

dist_mat
}
}
Loading

0 comments on commit dbb23b2

Please sign in to comment.