Skip to content

Commit

Permalink
More pipeline bug fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
n8layman committed Oct 11, 2024
1 parent c17518e commit 7082dce
Show file tree
Hide file tree
Showing 9 changed files with 2,712 additions and 1,006 deletions.
Binary file modified .env
Binary file not shown.
8 changes: 3 additions & 5 deletions R/create_ndvi_date_lookup.R
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@
#' @author Emma Mendelsohn
#' @export
create_ndvi_date_lookup <- function(sentinel_ndvi_transformed,
sentinel_ndvi_transformed_directory,
modis_ndvi_transformed,
modis_ndvi_transformed_directory) {
modis_ndvi_transformed) {

# Connect to Sentinel and Modis datasets
sentinel_dataset <- arrow::open_dataset(list.files(sentinel_ndvi_transformed_directory, pattern = "*.parquet", full.names = TRUE))
modis_dataset <- arrow::open_dataset(list.files(modis_ndvi_transformed_directory, pattern = "*.parquet", full.names = TRUE))
sentinel_dataset <- arrow::open_dataset(sentinel_ndvi_transformed)
modis_dataset <- arrow::open_dataset(modis_ndvi_transformed)

# Sentinel dates handling -------------------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion R/get_outbreak_history.R
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ get_daily_outbreak_history <- function(dates_df,

# Check if outbreak_history file exist and can be read and that we don't want to overwrite them.
if(!is.null(error_safe_read_parquet(outbreak_history_filename)) & !overwrite & year != year(Sys.time())) {
message("preprocessed landcover parquet file already exists and can be loaded, skipping download and processing")
message("preprocessed outbreak history parquet file already exists and can be loaded, skipping download and processing")
return(outbreak_history_filename)
}

Expand Down
44 changes: 29 additions & 15 deletions R/get_outbreak_history_animation.R
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
get_outbreak_history_animation <- function(wahis_outbreak_history,
wahis_outbreak_history_animations_directory,
num_cores = 1,
overwrite = FALSE,
...) {

output_basename = tools::file_path_sans_ext(basename(wahis_outbreak_history))
Expand All @@ -35,29 +36,36 @@ get_outbreak_history_animation <- function(wahis_outbreak_history,
# Fastest way to use parquet files is to do as many opperations before collect or pull
min_weight <- outbreak_history_dataset |>
summarise(min_weight = min(weight, na.rm = TRUE)) |>
pull(min_weight)
pull(min_weight, as_vector = TRUE)

max_weight <- outbreak_history_dataset |>
summarise(max_weight = max(weight, na.rm = TRUE)) |>
pull(max_weight)
pull(max_weight, as_vector = TRUE)

# Identify limits (used to calibrate the color scale)
lims <- c(min_weight, max_weight)

# Keep as much as possible out of memory and on disk. Arrow permits fast access.
time_frames <- outbreak_history_dataset |> select(time_frame) |> distinct() |> pull(time_frame)
time_frames <- outbreak_history_dataset |> select(time_frame) |> distinct() |> pull(time_frame, as_vector = TRUE)

# Create temporary directory if it does not yet exist
tmp_dir <- paste(wahis_outbreak_history_animations_directory, output_basename, sep = "/")
dir.create(tmp_dir, recursive = TRUE, showWarnings = FALSE)

# Make animations for both recent and old outbreaks
map(time_frames, function(tf) {
output_files <- map(time_frames, function(tf) {

output_filename = file.path(wahis_outbreak_history_animations_directory, glue::glue("{output_basename}_{tf}.gif"))
message(paste("Animating", basename(output_filename)))
output_filename = file.path(wahis_outbreak_history_animations_directory, glue::glue("{basename(output_basename)}_{tf}.gif"))

dates <- outbreak_history_dataset |> filter(time_frame == tf) |> select(date) |> distinct() |> pull(date)

# Create temporary directory if it does not yet exist
tmp_dir <- paste(wahis_outbreak_history_animations_directory, output_basename, sep = "/")
dir.create(tmp_dir, recursive = TRUE, showWarnings = FALSE)
# Check if outbreak_history file exist and can be read and that we don't want to overwrite them.
if(file.exists(output_filename) & !overwrite) {
message(glue::glue("{basename(output_filename)} already exists. Skipping."))
return(output_filename)
}

message(paste("Animating", basename(output_filename)))

dates <- outbreak_history_dataset |> filter(time_frame == tf) |> select(date) |> distinct() |> pull(date, as_vector = TRUE)

# This function makes a png for each date which will then get stiched together
# into the animation. Saving each png is faster than trying to do everything
Expand All @@ -67,9 +75,10 @@ get_outbreak_history_animation <- function(wahis_outbreak_history,
function(d) plot_outbreak_history(outbreak_history_dataset |>
filter(date == d) |>
filter(time_frame == tf) |>
collect(),
tmp_dir = tmp_dir,
lims = lims)) |>
collect(),
tmp_dir = tmp_dir,
write_frame = TRUE,
lims = lims)) |>
unlist() |> sort()

# Add in a delay at end before looping back to beginning. This is in frames not seconds
Expand All @@ -81,12 +90,17 @@ get_outbreak_history_animation <- function(wahis_outbreak_history,
gif_file = output_filename)

# Clean up temporary files
unlink(tmp_dir, recursive = T)
file.remove(png_files)

# Return the location of the rendered animation
output_filename
}
)

# Clean up temporary files
unlink(tmp_dir, recursive = T)

return(output_files)
}

#' Plot Outbreak History
Expand Down
25 changes: 8 additions & 17 deletions _targets.R
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ dynamic_targets <- tar_plan(
wahis_outbreak_history_AWS), # Enforce Dependency
pattern = map(wahis_outbreak_dates),
error = "null", # Keep going if error. It will be caught next time the pipeline is run.

format = "file",
repository = "local"),

Expand All @@ -320,12 +319,11 @@ dynamic_targets <- tar_plan(
wahis_outbreak_history_animations_directory), # Just included to enforce dependency with wahis_outbreak_history
pattern = map(wahis_outbreak_history),
error = "null",
format = "file",
repository = "local"),
#
# tar_target(wahis_outbreak_history_animations_AWS_upload, AWS_put_files(wahis_outbreak_history_animations,
# wahis_outbreak_history_animations_directory),
# error = "null"), # Continue the pipeline even on error

tar_target(wahis_outbreak_history_animations_AWS_upload, AWS_put_files(wahis_outbreak_history_animations,
wahis_outbreak_history_animations_directory),
error = "null"), # Continue the pipeline even on error

# SENTINEL NDVI -----------------------------------------------------------
# 2018-present
Expand Down Expand Up @@ -378,11 +376,8 @@ dynamic_targets <- tar_plan(
tar_target(modis_ndvi_bundle_request, submit_modis_ndvi_bundle_request(modis_ndvi_token,
modis_ndvi_task_id_continent,
modis_ndvi_bundle_request_file) |>
rowwise() |>
tar_group(),
cue = tar_cue("always"),
iteration = "group"
),
filter(grepl("NDVI", file_name)),
cue = tar_cue("always")),

# Check if modis_ndvi files already exists on AWS and can be loaded
# The only important one is the directory. The others are there to enforce dependencies.
Expand Down Expand Up @@ -623,12 +618,8 @@ data_targets <- tar_plan(


# ndvi anomalies --------------------------------------------------
tar_target(ndvi_date_lookup,
create_ndvi_date_lookup(sentinel_ndvi_transformed,
sentinel_ndvi_transformed_directory,
modis_ndvi_transformed,
modis_ndvi_transformed_directory)),

tar_target(ndvi_date_lookup, create_ndvi_date_lookup(sentinel_ndvi_transformed,
modis_ndvi_transformed)),

tar_target(ndvi_historical_means_directory,
create_data_directory(directory_path = "data/ndvi_historical_means")),
Expand Down
Loading

0 comments on commit 7082dce

Please sign in to comment.