Skip to content

Commit

Permalink
Add in lagged data
Browse files Browse the repository at this point in the history
  • Loading branch information
n8layman committed Dec 13, 2024
1 parent 426c793 commit 2fc4228
Show file tree
Hide file tree
Showing 12 changed files with 3,162 additions and 2,069 deletions.
Binary file modified .env
Binary file not shown.
1 change: 1 addition & 0 deletions R/calculate_forecasts_anomalies.R
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ calculate_forecasts_anomalies <- function(ecmwf_forecasts_transformed,
summarize(across(matches("temperature|precipitation|relative_humidity"), ~mean(.x, na.rm = T)), .groups = "drop")

# Calculate temperature anomalies
# scaled requires non na values for SD which means there must be variation in temp at that site.
forecast_anomaly <- forecast_anomaly |>
mutate(anomaly_forecast_temperature = temperature_forecast - temperature_historical,
anomaly_forecast_scaled_temperature = anomaly_forecast_temperature / temperature_sd_historical)
Expand Down
12 changes: 4 additions & 8 deletions R/file_partition_duckdb.R
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,8 @@ file_partition_duckdb <- function(explanatory_variable_sources, # A named, neste

parquet_list <- glue::glue("{parquet_list} {parquet_filter}")

# Filter if the type is dynamic to reduce as much as possible the memory footprint
if(!is.null(file_schemas[[1]]$year)) parquet_list <- glue::glue("{parquet_list} WHERE year = {year(model_dates_selected)}")

# Check if all schemas are identical
if(unified_schema) {

parquet_list <- glue::glue("{parquet_list} {parquet_filter}")
# If all schema are identical: union all files
parquet_list <- paste(parquet_list, collapse = " UNION ALL ")

Expand All @@ -100,11 +95,12 @@ file_partition_duckdb <- function(explanatory_variable_sources, # A named, neste
})

# Set up a natural inner join for all the tables and output the result to file(s)
query <- glue::glue("COPY (SELECT * FROM {paste(names(explanatory_variable_sources), collapse = ' NATURAL JOIN ')}) TO '{save_filename}' (FORMAT 'parquet', CODEC 'gzip', ROW_GROUP_SIZE 100_000)")
query <- glue::glue("SELECT * FROM {paste(names(explanatory_variable_sources), collapse = ' NATURAL JOIN ')}")

# Execute the join
result <- DBI::dbExecute(con, query)
message(result)
result <- DBI::dbGetQuery(con, query) |> as_tibble() |> relocate(x, y, date, doy, year, month)

arrow::write_parquet(result, save_filename, compression = "gzip", compression_level = 5)

# Clean up the database connection
duckdb::dbDisconnect(con)
Expand Down
47 changes: 34 additions & 13 deletions R/lag_data.R
Original file line number Diff line number Diff line change
@@ -1,30 +1,51 @@
lag_data <- function(data_files,
lag_intervals,
model_dates_selected,
lagged_data_directory,
basename_template = "lagged_data_{model_dates_selected}.gz.parquet",
overwrite = TRUE,
...) {

# Check that we're only working on one date at a time
stopifnot(length(model_dates_selected) == 1)

# Set filename
save_filename <- file.path(lagged_data_directory, glue::glue(basename_template))

# Check if file already exists and can be read
error_safe_read_parquet <- possibly(arrow::open_dataset, NULL)

if(!is.null(error_safe_read_parquet(save_filename)) & !overwrite) {
message("file already exists and can be loaded, skipping download")
return(save_filename)
}

# The goal of this is to figure out the average of the data column over the interval
# Find dates at start and end interval back from date
# Group by x, y, start_interval, end_interval, and take the mean don't forget na.rm = T
message(glue::glue("calculating lagged data for {dirname(data_files[1])} starting from {model_dates_selected}"))

data <- arrow::open_dataset(data_files)
lagged_data <- map2_dfr(tail(lag_intervals,-1), head(lag_intervals,-1), function(lag_interval_start, lag_interval_end) {

lagged_data <- map_dfr(1:(length(lag_intervals) - 1), function(i) {

start_date = model_dates_selected - days(lag_intervals[i])
end_date = model_dates_selected - days(lag_intervals[i+1] - 1)
start_date = model_dates_selected + days(lag_interval_start) # start, i.e. 30 days prior.
end_date = model_dates_selected + days(lag_interval_end) # end, i.e. 0 days prior.
message(glue::glue("lag_interval range ({lag_interval_start}, {lag_interval_end}]: ({start_date}, {end_date}]"))

data |> filter(date >= end_date, date <= start_date) |>
# Note: lags go back in time so the inequality symbols are reversed. Also
# date > start_date makes the range _exclusive_ (start_date, end_date] to avoid
# duplication problems.
arrow::open_dataset(data_files) |>
filter(date > start_date, date <= end_date) |>
collect() |>
select(-source) |>
group_by(x, y, date, doy, month, year) |>
summarize(across(everything(), ~mean(.x, na.rm = T))) |>
mutate(lag_interval = lag_intervals[i])
summarize(across(everything(), ~mean(.x, na.rm = T)), .groups = "drop") |>
mutate(lag_interval_start = lag_interval_start,
lag_interval_end = lag_interval_end)

select(-doy, -month, -year) |> mutate(date = dplyr::lag(date, lag_interval)) |> rename_with(~ paste(.x, "lag", lag_interval, sep = "_"), contains("ndvi")) |> drop_na(date)
}) |> reduce(left_join, by = c("x", "y", "date")) |>
drop_na() |>
rename_with(~gsub("_0", "", .x), contains("_0"))
})

arrow::write_parquet(lagged_data, save_filename, compression = "gzip", compression_level = 5)

save_filename

}
9 changes: 4 additions & 5 deletions R/transform_ecmwf_forecasts.R
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,12 @@ transform_ecmwf_forecasts <- function(ecmwf_forecasts_api_parameters,
# 2. Fix total_precipitation metadata and convert units from m/second to mm/day.
# Note the variable name is total_precipitation but it is really *mean total precipitation rate*
# 3. Correct precip sd
grib_data <- grib_data |> mutate(date = base_date,
mean = ifelse(units == "C", mean - 273.15, ((mean > 0) * 8.64e+7 * mean)),
grib_data <- grib_data |> mutate(mean = ifelse(units == "C", mean - 273.15, ((mean > 0) * 8.64e+7 * mean)),
sd = ifelse(units == "", ((sd > 0) * 8.64e+7 * sd), sd),
var_id = ifelse(units == "", "tprate", var_id),
units = ifelse(units == "", "mm/day", units),
month = as.integer(lubridate::month(date)), # Base month
year = as.integer(lubridate::year(date)), # Base year
month = as.integer(lubridate::month(base_date)), # Base month
year = as.integer(lubridate::year(base_date)), # Base year
lead_month = as.integer(lubridate::month(forecast_end_date - 1)),
lead_year = as.integer(lubridate::year(forecast_end_date - 1)),
variable = fct_recode(variable,
Expand All @@ -145,7 +144,7 @@ transform_ecmwf_forecasts <- function(ecmwf_forecasts_api_parameters,

# Calculate relative humidity from temperature and dewpoint temperature
grib_data <- grib_data |>
select(x, y, date, month, year, lead_month, lead_year, mean, sd, variable) |>
select(x, y, base_date, month, year, lead_month, lead_year, mean, sd, variable) |>
pivot_wider(names_from = variable, values_from = c(mean, sd), names_glue = "{variable}_{.value}") |> # Reshape to make it easier to calculate composite values like relative humidity
mutate(

Expand Down
114 changes: 90 additions & 24 deletions README.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,99 @@ knitr::opts_chunk$set(
[![License: CC-BY-4.0](https://img.shields.io/badge/License (for text)-CC_BY_4.0-blue.svg)](http://creativecommons.org/publicdomain/zero/1.0/)
<!-- badges: end -->

### OpenRVFcast
EcoHealth Alliance's ongoing OpenRVFcast project is developing a generalizable, open-source modeling framework for predicting Rift Valley Fever (RVF) outbreaks in Africa, funded by the Wellcome Trust's climate-sensitive infectious disease [modeling initiative](https://wellcome.org/news/digital-tools-climate-sensitive-infectious-disease). We aim to integrate open data sets of climatic and vegetation data with internationally-reported outbreak data to build an modeling pipeline that can be adapted to varying local conditions in RVF-prone regions across the continent.
# Overview of OpenRVFcast
The goal of EcoHealth Alliance's ongoing OpenRVFcast project the development of a generalizable, open-source modeling framework for predicting Rift Valley Fever (RVF) outbreaks in Africa, funded by the Wellcome Trust's climate-sensitive infectious disease [modeling initiative](https://wellcome.org/news/digital-tools-climate-sensitive-infectious-disease). We aim to integrate open data sets of climatic and vegetation data with internationally-reported outbreak data to build an modeling pipeline that can be adapted to varying local conditions in RVF-prone regions across the continent.

### Repository Structure and Reproducibility
- `data/` contains downloaded and transformed data sources. These data are .gitignored and non-EHA users will need to download the data.
### Pipeline Structure
The project pipeline is organized into two distinct modules: 1) the Data Acquisition Module and 2) the Modeling Framework Module. Both modules are orchestrated using the `targets` package in R, a powerful tool for creating reproducible and efficient data analysis workflows. By defining a workflow of interdependent tasks, known as 'targets', this package ensures that each step in the workflow is only executed when its inputs or code change, thereby optimizing computational efficiency. A modular, scalable, and transparent design makes `targets` an ideal choice for managing pipelines in reproducible research and production environments. An introduction to workflow management using `targets` can be found [here](https://books.ropensci.org/targets/). This project also uses the [{renv}](https://rstudio.github.io/renv/) framework to track R package dependencies and versions which are recorded in the `renv.lock` file. Code used to manage dependencies is in `renv/` and other files in the root project directory. On starting an R session in the working directory, run ``renv::hydrate()` and `renv::restore()` to install required R packags and dependencies.

### Repository Structure
Project code is available on the [open-rvfcast](https://github.com/ecohealthalliance/open-rvfcast) GitHub repository

- `data/` contains downloaded and transformed data sources. These data are .gitignored and are available with access to the EHA open-rvf S3 bucket or the raw data can be download and processed.
- `R/` contains functions used in this analysis.
- `reports/` contains literate code for R Markdown reports generated in the analysis
- `outputs/` contains compiled reports and figures.
- This project uses the [{renv}](https://rstudio.github.io/renv/) framework to
record R package dependencies and versions. Packages and versions used are recorded in `renv.lock` and code used to manage dependencies is in `renv/` and other files in the root project directory. On starting an R session in the working directory, run `renv::restore()` to install R package dependencies.
- This project uses the [{targets}](https://wlandau.github.io/targets-manual/)
framework to organize build steps for analysis pipeline. The schematic figure below summarizes the steps. (The figure is generated using `mermaid.js` syntax and should display as a graph on GitHub. It can also be viewed by pasting the code into <https://mermaid.live>.)



### Data Storage

We utilized parquet files and the `arrow` package in R as our primary method of storing data. Parquet files are optimized for high-performance, out-of-memory data processing, making it well-suited for efficiently handling and processing large, complex datasets. Additionally, `arrow::open_dataset()` supports seamless integration with cloud storage, enabling direct access to remote datasets, which improves workflow efficiency and scalability when working with large, distributed data sources. While the data acquisition module requires the processing of large datasets, the final cleaned data can be accessed directly from the cloud simply via:

```
dataset <- open_dataset("s3://open-rvfcast/data/explanatory_variables")
```

Because parquet files are a columnar format with structured metadata available in each file, some operations, such as filtering, can be applied directly to remote datasets without having to first download the full data. The following will only download the model data for a single day:

```
dataset <- open_dataset("s3://open-rvfcast/data/explanatory_variables") |> filter(date == "2023-12-14") |> collect()
```

Due to the large nature of the data not every day is available - the dataset has been subsetted to two randomly chosen days per month between 2007 and 2024.

## 1. Data Acquisition Module

### Cloud Storage

Many of the computational steps in the first module can be time consuming and either depend on or produce large files. In order to speed up the pipeline, intermediate files can be stored on the cloud for rapid retrieval and portability between pipeline instances. We currently use an AWS [S3 bucket](https://aws.amazon.com/s3/) for this purpose. The pipeline will still run without access to cloud storage but the user can benefit from adapt the `_targets.R` file to use their own object storage repository. AWS access keys and bucket ID are stored in the `.env` file.

### Data Access

Gaining access to the source data stores involves obtaining authentication credentials, such as API keys, tokens, and certificates, to ensure secure communication and data transfer. There are three primary sources of data that require access credentials
1. [ECMWF](https://www.ecmwf.int/): for accessing monthly weather forecasts from the European Centre for Medium-Range Weather Forecasts (ECMWF).
2. [COPERNICUS](https://dataspace.copernicus.eu/): for accessing Normalized Difference Vegetation Index (NDVI) data derived from the European Space Agency's Sentinel-3 satellite.
3. [APPEEARS](https://appeears.earthdatacloud.nasa.gov/api/): for accessing historical NDVI data prior to the Sentinel-3 mission from NASA MODIS satellites.

### Data Sources

#### Static Data

The following data sources are static, or time-invariant. Raw static data was downloaded from the linked sources and joined with dynamic data, such as temperature, which varied by day.

1. [Soil types]():
2. [Aspect]():
3. [Slope]():
4. [Gridded Livestock of the World (GLW)]():
5. [Elevation]():
6. [Bioclimatic data]():
7. [Landcover type]():

#### Dynamic Data

Dynamic data sources are those that vary with time. The following sources make up the dynamic layers

1. [weather_anomalies]()
2. [forecasts_anomalies]()
3. [ndvi_anomalies]()
4. [wahis_outbreak_history]()

#### Temporal Covaraince

In order to isolate the influence of each dynamic predictor, which can be highly conflated with each other due to a shared dependence on time and long-term trends, we used the difference between current values and historical means instead of raw values for dynamic layers. This approach helped mitigate the strong correlation with time that naturally exists in environmental variables like temperature and NDVI. Seasonality was then accounted for by including year and day-of-year (DOY) as predictors in the model.

#### Forecast Dynamic Data


#### Lagged Dynamic Data




#### Historical Outbreak Data




### Targets Pipeline

A visualization of the data acquisition module can be found below. Additional targets not shown are responsible for fetching and storing intermediate datasets on the cloud. To run the data acquisition module, download the repository from github and run the following command. Note, without access to the common S3 bucket store this pipeline will take a significant amount of time and space to run. In addition, without access to the remote data store, the data acquisition module must be run before running the modeling module.

```
tar_make(store = "data_aquisition_targets.R")
```

The schematic figure below summarizes the steps of the data acquisition module. The figure is generated using `mermaid.js` syntax and should display as a graph on GitHub. It can also be viewed by pasting the code into <https://mermaid.live>.)

```{r, echo=FALSE, message = FALSE, results='asis'}
mer <- targets::tar_mermaid(targets_only = TRUE, outdated = FALSE,
legend = FALSE, color = FALSE,
Expand All @@ -53,22 +132,9 @@ cat(
)
```

Many of the computational steps can be time consuming and either depend on or produce large files. In order to speed up the pipeline, intermediate files can be stored on the cloud for rapid retrieval and portability between pipeline instances. We currently use an AWS [S3 bucket](https://aws.amazon.com/s3/) for this purpose. The pipeline will still run without access to cloud storage but the user can benefit from adapt the `_targets.R` file to use their own object storage repository. AWS access keys and bucket ID are stored in the `.env` file.

For handling gridded binary weather data, this pipeline uses the [ecCodes](https://confluence.ecmwf.int/display/ECC) package which can be installed on OSX using `homebrew`:

```
brew install eccodes
```

and on Ubuntu using apt-get

```
sudo apt update
sudo apt install eccodes
```
## 2. Rift Valley Fever (RVF) risk model pipeline

EHA users: see the `stripts/` repository to be able to directly download data from AWS outside of the targets workflow.

Follow the links for more information about:

Expand Down
Loading

0 comments on commit 2fc4228

Please sign in to comment.