-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy path_targets.R
109 lines (91 loc) · 3.14 KB
/
_targets.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# Load packages required to define the pipeline:
library(targets)
library(tidyr)
library(tibble)
library(here)
library(tarchetypes)
# Set target options:
tar_option_set(
packages = c(
"tibble",
"ncdf4",
"readr",
"dplyr",
"rLakeAnalyzer",
"pracma",
"LakeMetabolizer",
"deSolve",
"tidyr",
"lubridate",
"ggplot2",
"stringr",
"here"
), # packages that your targets need to run
# format = "rds" # default storage format
format = "qs" # default storage format
)
tar_config_set(store = "~/scratch/isioxy/") # Folder for target's internal data storage
tar_option_set(storage = "worker", retrieval = "worker") # let workers store and retrieve data directely
# Slurm configs
tar_option_set(
resources = tar_resources(
clustermq = tar_resources_clustermq(template = list(memory = "2G", time = "01:00:00"))
)
)
# tar_make_clustermq() configuration:
# options(clustermq.scheduler = "multicore") # parallel processing on local machine
options(clustermq.scheduler = "slurm") # Slurm on HPC
options(clustermq.template = "clustermq.tmpl") # Slurm sbatch template
# source required functions
source("R/thermal_script.R")
source("R/thermocline_helper.R")
source("R/oxygen_helper.R")
# settings for computations:
lake_folder <- "ObsDOTest" # Folder containing isimip results
numit <- 1000 # number of iterations for oxygen model
stratification_batches <- 5 # Number of batches of stratification events per lake
# Total number of targets for computation: lakes * batches
# target list:
# lakes <- expand_grid(
# lake_id = list.files(here(lake_folder), full.names = F)[1],
# trophy = c("oligo", "eutro")
# )
lakes <- tibble(
lake_id = list.files(here(lake_folder), full.names = F)
)
glob_trophy <- tar_target(trophy, c("oligo", "eutro"))
glob_params <- tar_target(oxy_params, get_prior(trophy, n = numit), pattern = trophy)
targets <- tar_map(
values = lakes,
#process thermal module per lake
tar_target(thermal, thermal_info(here(lake_folder, lake_id))),
# batch computation based on stratification events. Number of batches defined by 'stratification_batches'
tar_group_count(
thermal_to_model,
tibble(data = thermal %>%
filter(!is.na(stratified)) %>%
filter(duration > 2) %>%
group_split(strat_id)),
count = stratification_batches
),
# run oxygen model
tar_target(oxygen,
run_oxygen_model(
thermal_to_model,
method = "rk4",
trophy = trophy,
iterations = numit,
params = oxy_params
),
pattern = cross(thermal_to_model, trophy)
),
# store results of oxygen model in results folder
tar_target(write_oxygen, save_model_output(oxygen, lake_id), format = "file"),
# load observations (Abby's lakes)
tar_target(observations, read_observations(lake_id)),
# Create QC plots for oxygen
tar_target(plot_qc_oxy, save_qc_plot_oxygen(oxygen, lake_id, observations), format = "file"),
# Create plots of temperature and thermocline depth
tar_target(plot_thermal, create_plots_thermal(thermal, lake_id, lake_folder), format = "file")
)
list(glob_trophy, glob_params, targets)