#' Get a tibble of tasks from ECMWF API
#' This function retrieves tasks from the ECMWF (European Centre for Medium-Range Weather Forecasts) API.
#' It sends a GET request using the `httr` package and returns the content of the response as a data frame.
#' @author Nathan Layman
#' @param url A character string specifying the base URL for the ECMWF CDS tasks API.
#' Defaults to "".
#' @return A data frame with the tasks retrieved from ECMWF.
#' @export
get_ecwmf_tasks <- function(url = "") {
response <- httr::GET(
httr::authenticate(Sys.getenv("ECMWF_USERID"), Sys.getenv("ECMWF_TOKEN"))

httr::content(response) |> bind_rows()

#' Clear ECMWF Queued Tasks
#' This function retrieves the currently queued tasks from the ECMWF (European Centre for Medium-Range Weather Forecasts) CDS (Climate Data Store)
#' API and deletes them using their request IDs.
#' @author Nathan Layman
#' @param url A character string specifying the base URL for the ECMWF CDS tasks API.
#' Defaults to "".
#' @return A data frame with the tasks retrieved from ECMWF.
#' @export
clear_ecwmf_tasks <- function(url = "") {

tasks_to_clear <- get_ecwmf_tasks() |>
filter(state == "queued") |>
mutate(request_id = paste0(url, request_id)) |>

request <- walk(tasks_to_clear, ~httr::DELETE(.x, httr::authenticate(Sys.getenv("ECMWF_USERID"), Sys.getenv("ECMWF_TOKEN"))))


submit_ecwmf_request <- function(request,
url = "") {

response <- httr::PUT(jsonlite::toJSON(request), httr::authenticate(Sys.getenv("ECMWF_USERID"), Sys.getenv("ECMWF_TOKEN")))

response <- POST(
url = paste0(ecmwf_url, "/datasets/data/era5"), # Update endpoint as needed
Authorization = paste("Bearer", api_key),
"Content-Type" = "application/json"
body = toJSON(query), # Convert the query to JSON format
encode = "json" # Encode the body as JSON

# Check if the request was successful
if (status_code(response) == 200) {
# Save the response content to the desired file
writeBin(content(response, "raw"), "")
cat("File saved successfully.\n")
} else {
cat("Failed to fetch data. Status code:", status_code(response), "\n")
#' Get and parse gdalinfo metadata from a grib file without relying on grib_ls
#' @author Nathan Layman
#' @param file
#' @return
#' @export
#' @examples
get_grib_metadata <- function(raw_file) {

gdalinfo_text <- terra::describe(raw_file, options = "json")

# Remove all text up to first BAND ^GEOGCRS
metadata_start_index <- grep("^Band|^BAND", gdalinfo_text)[1]
metadata_text <- gdalinfo_text[metadata_start_index:length(gdalinfo_text)]
metadata_text <- metadata_text[!str_detect(metadata_text, "^Band|BAND|Metadata|METADATA")]

metadata <- map_dfr(metadata_text, ~stringr::str_split(.x[1], "=")[[1]] |>
stringr::str_squish() |> setNames(c("name", "value"))) |>
mutate(band = cumsum(name == "Description")) |>
pivot_wider(names_from = "name", values_from = "value")

#' Check error recorded in meta file for a target
#' @param branch
#' @return
#' @export
#' @examples
tar_get_error <- function(branch) {
branch_name <- deparse(substitute(branch))
tar_meta() |> filter(name == branch_name) |> pull(error)
#' .. content for \description{} (no empty lines) ..
#' Transform ECMWF Seasonal Forecast Data
#' .. content for \details{} ..
#' This function downloads ECMWF seasonal forecast data, transforms it into parquet format, and performs basic checks
#' on the downloaded GRIB files. It leverages the ECMWF API to fetch forecast data for a specific system, year, and set of variables.
#' @author Nathan Layman, Emma Mendelsohn
#' @title
#' @param ecmwf_forecasts_downloaded
#' @param ecmwf_forecasts_transformed_directory
#' @param continent_raster_template
#' @param overwrite
#' @return
#' @author Emma Mendelsohn
#' @param ecmwf_forecasts_api_parameters A list containing the parameters for the ECMWF API request such as system, year, month, variables, etc.
#' @param local_folder Character. The path to the local folder where transformed files will be saved. Defaults to `ecmwf_forecasts_transformed_directory`.
#' @param continent_raster_template The path to the raster file used as a template for continent-level spatial alignment.
#' @param n_workers Integer. The number of workers to use for parallel processing, defaults to 2.
#' @return Returns the path to the transformed parquet file if successful, or stops the function if there is an error.
#' @details The function checks if the transformed file already exists for the given year and system. If it exists and is valid, it returns the file path.
#' If not, it downloads the raw GRIB file using the ECMWF API, attempts to load and transform it, and saves the output as a parquet file. The function
#' checks file validity at multiple stages. Notes: Must accept licenses by manually downloading a dataset from here:
#' @export
transform_ecmwf_forecasts <- function(ecmwf_forecasts_downloaded,
n_workers = 1,
overwrite = FALSE) {

# Get filename for saving from the raw data
filename <- tools::file_path_sans_ext(basename(ecmwf_forecasts_downloaded))
save_filename <- glue::glue("{filename}.gz.parquet")

# Check if file already exists
existing_files <- list.files(ecmwf_forecasts_transformed_directory)
message(paste0("Transforming ", save_filename))
if(save_filename %in% existing_files & !overwrite){
message("file already exists, skipping transform")
return(file.path(ecmwf_forecasts_transformed_directory, save_filename))
transform_ecmwf_forecasts <- function(ecmwf_forecasts_api_parameters,
local_folder = ecmwf_forecasts_transformed_directory,
n_workers = 2,
time_out = 3600) {

# Check that ecmwf_forecasts_api_parameters is only one row
stopifnot(nrow(ecmwf_forecasts_api_parameters) == 1)

# Extract necessary details from the ecmwf paramters
system <- ecmwf_forecasts_api_parameters$system
year <- ecmwf_forecasts_api_parameters$year
month <- unlist(ecmwf_forecasts_api_parameters$month)

# Create an error safe way to test if the parquet file can be read, if it exists
error_safe_read_parquet <- possibly(arrow::read_parquet, NULL)

transformed_file <- gsub("\\.grib", "\\.gz\\.parquet", raw_file)

# Check if transformed file already exists and can be loaded.
# If so return file name and path unless it's the current year
if(!is.null(error_safe_read_parquet(transformed_file)) && year < year(Sys.time())) return(transformed_file)

# ensure local_folder ends in '/'

# If the transformed file doesn't exist download what we need from ECMWF
# The metadata returned in grib formats sucks and requires an externam dependancy
# so download each piece separtatly.
raw_files <- expand.grid(product_type = unlist(ecmwf_forecasts_api_parameters$product_types),
variable = unlist(ecmwf_forecasts_api_parameters$variables)) |>
rowwise() |>
mutate(raw_file = file.path(local_folder, glue::glue("ecmwf_seasonal_forecast_sys{system}_{year}_{product_type}_{variable}.grib")))

request_list <- purrr::pmap(raw_files, function(product_type, variable, raw_file) {

originating_centre = "ecmwf",
system = system,
variable = variable, # This can't (easily) be extracted from terra::describe()
product_type = product_type, # This can't be extracted from terra::describe()
year = year,
month = month,
leadtime_month = unlist(ecmwf_forecasts_api_parameters$leadtime_months), # This can be extracted from terra::describe()
area = round(unlist(ecmwf_forecasts_api_parameters$spatial_bounds), 1), # This can be extracted from terra::describe()
format = "grib",
dataset_short_name = "seasonal-monthly-single-levels",
target = basename(raw_file)


ecmwfr::wf_set_key(user = Sys.getenv("ECMWF_USERID"), key = Sys.getenv("ECMWF_TOKEN"))

ecmwf_files <- ecmwfr::wf_request_batch(request = request_list,
user = Sys.getenv("ECMWF_USERID"),
workers = n_workers,
path = local_folder,
time_out = time_out,
total_timeout = length(request_list) * time_out/n_workers)

# Verify that terra can open all the saved grib files. If not return NULL to try again next time
error_safe_read_rast <- possibly(terra::rast, NULL)
raw_gribs = map(ecmwf_files, ~error_safe_read_rast(.x))

# If not remove the files and stop
if(any(is.null(raw_gribs))) {
stop(glue::glue("At least one of the grib files for {raw_file} could not be loaded after download."))

# Read in with terra
grib <- rast(ecmwf_forecasts_downloaded)

# Read in continent template raster
continent_raster_template <- rast(continent_raster_template)

# Get associated metadata and remove non-df rows
grib_meta <- system(paste("grib_ls", ecmwf_forecasts_downloaded), intern = TRUE)
remove <- c(1, (length(grib_meta)-2):length(grib_meta))
grib_meta <- system(paste("grib_ls", raw_file), intern = TRUE)
remove <- c(1, (length(grib_meta)-2):length(grib_meta))
grib_meta <- grib_meta[-remove]

# Processing metadata to join with actual data
meta <- read.table(text = grib_meta, header = TRUE) |>
as_tibble() |>
janitor::clean_names() |>
mutate(variable_id = as.character(glue::glue("{data_date}_step{step_range}_{data_type}_{short_name}"))) |>
mutate(data_date = ymd(data_date)) |>
select(-grid_type, -packing_type, -level, -type_of_level, -centre, -edition)

# Almost got rid of grib_ls dependency but can't get data_type from gdalinfo.
# Though it looks like we're only using the mean?
grib_meta <- pmap_dfr(raw_files, function(product_type, variable, raw_file) {
get_grib_metadata(raw_file) |>
mutate(step_range = as.numeric(GRIB_FORECAST_SECONDS) / 3600, # forecast step in hours from seconds
data_date = as.POSIXct(as.numeric(GRIB_REF_TIME), origin = "1970-01-01", tz = "UTC"), # Forecasting out from
short_name = stringr::str_to_sentence(GRIB_ELEMENT),
data_type = product_type,
variable = variable,
variable_id = as.character(glue::glue("{data_date}_step{step_range}_{data_type}_{short_name}"))) |>
dplyr::select(data_date, step_range, data_type, variable, short_name, variable_id)

raw_grib <- terra::rast(raw_gribs)
# Rename layers with the metadata names
names(grib) <- meta$variable_id

# Select only the means columns
grib_subset <- subset(grib, which(str_detect(names(grib), "fcmean")))
names(raw_grib) <- grib_meta$variable_id

# Units conversions
# 2d = "2m_dewpoint_temperature" = 2 metre dewpoint temperature K
# 2t = "2m_temperature" = 2 metre temperature K
# 2t = "2m_temperature" = 2 metre temperature K
# tprate = "total_precipitation" = Total precipitation m/s

Expand Down Expand Up @@ -135,6 +191,4 @@ transform_ecmwf_forecasts <- function(ecmwf_forecasts_downloaded,
write_parquet(grib_means, here::here(ecmwf_forecasts_transformed_directory, save_filename), compression = "gzip", compression_level = 5)

return(file.path(ecmwf_forecasts_transformed_directory, save_filename))

#' (parquet file), aligning it with a continental raster template. If the transformed file
#' already exists and can be read, it will return the existing file.
#' @author Nathan Layman
#' @author Nathan Layman, Emma Mendelsohn
#' @param modis_ndvi_token Character. The authentication token required for the AppEEARS API.
#' @param modis_ndvi_bundle_request List. Contains the `file_name`, `task_id`, and `file_id` from the AppEEARS bundle request for MODIS NDVI data.
Expand Down Expand Up @@ -46,7 +46,6 @@ transform_modis_ndvi <- function(modis_ndvi_token,
response <- httr::GET(paste("", task_id, '/', file_id, sep = ""),
httr::write_disk(raw_file, overwrite = TRUE), httr::progress(), httr::add_headers(Authorization = modis_ndvi_token))

# Verify rast can open the saved raster file. If not return NULL
error_safe_read_rast <- possibly(terra::rast, NULL)
raw_raster = error_safe_read_rast(raw_file)
#' based on a continent raster template, and saves the resulting dataset as a parquet file. It checks if the
#' transformed file already exists and avoids redundant data downloads and processing.
#' @author Nathan Layman
#' @author Nathan Layman, Emma Mendelsohn
#' @param nasa_weather_coordinates Dataframe. A dataframe containing columns of coordinates for the bounding box (x_min, y_min, x_max, y_max) to download weather data.
#' @param nasa_weather_year Integer. The year for which to download and transform the weather data.
Expand Down

