Skip to content

Commit

Permalink
feat: add memory cache for defillama
Browse files Browse the repository at this point in the history
  • Loading branch information
EvolveArt committed Nov 23, 2024
1 parent 28fd700 commit bd3b461
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 22 deletions.
132 changes: 132 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ env_logger = "0.10.1"
futures = "0.3.28"
hyper = "0.14.27"
lazy_static = "1.4.0"
moka = { version = "0.12.8", features = ["future"] }
num-bigint = "0.4"
phf = { version = "0.11", features = ["macros"] }
prometheus = "0.13.3"
Expand Down
38 changes: 33 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@ use deadpool::managed::Pool;
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
use diesel_async::AsyncPgConnection;
use dotenv::dotenv;
use moka::future::Cache;
use monitoring::price_deviation::CoinPricesDTO;
use tokio::task::JoinHandle;
use tokio::time::interval;

use config::{get_config, init_long_tail_asset_configuration, periodic_config_update, DataType};
use processing::common::{check_publisher_balance, data_indexers_are_synced};
use tracing::instrument;
use utils::{is_long_tail_asset, log_monitoring_results, log_tasks_results};

#[derive(Debug)]
struct MonitoringTask {
name: String,
handle: JoinHandle<()>,
Expand Down Expand Up @@ -77,29 +81,42 @@ async fn main() {
handle_task_results(monitoring_tasks).await;
}

#[instrument(skip_all)]
async fn spawn_monitoring_tasks(
pool: Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
) -> Vec<MonitoringTask> {
let cache = Cache::new(10_000);

let tasks = vec![
MonitoringTask {
name: "Config Update".to_string(),
handle: tokio::spawn(periodic_config_update()),
},
MonitoringTask {
name: "Spot Monitoring".to_string(),
handle: tokio::spawn(onchain_monitor(pool.clone(), true, &DataType::Spot)),
handle: tokio::spawn(onchain_monitor(
pool.clone(),
true,
&DataType::Spot,
cache.clone(),
)),
},
MonitoringTask {
name: "Future Monitoring".to_string(),
handle: tokio::spawn(onchain_monitor(pool.clone(), true, &DataType::Future)),
handle: tokio::spawn(onchain_monitor(
pool.clone(),
true,
&DataType::Future,
cache.clone(),
)),
},
MonitoringTask {
name: "Publisher Monitoring".to_string(),
handle: tokio::spawn(publisher_monitor(pool.clone(), false)),
},
MonitoringTask {
name: "API Monitoring".to_string(),
handle: tokio::spawn(api_monitor()),
handle: tokio::spawn(api_monitor(cache.clone())),
},
MonitoringTask {
name: "VRF Monitoring".to_string(),
Expand All @@ -110,6 +127,7 @@ async fn spawn_monitoring_tasks(
tasks
}

#[instrument]
async fn handle_task_results(tasks: Vec<MonitoringTask>) {
let mut results = HashMap::new();
for task in tasks {
Expand All @@ -119,7 +137,8 @@ async fn handle_task_results(tasks: Vec<MonitoringTask>) {
log_monitoring_results(results);
}

pub(crate) async fn api_monitor() {
#[instrument(skip(cache))]
pub(crate) async fn api_monitor(cache: Cache<(String, u64), CoinPricesDTO>) {
let monitoring_config = get_config(None).await;
tracing::info!("[API] Monitoring API..");

Expand All @@ -132,13 +151,14 @@ pub(crate) async fn api_monitor() {
.sources(DataType::Spot)
.iter()
.flat_map(|(pair, sources)| {
let my_cache = cache.clone();
if is_long_tail_asset(pair) {
vec![tokio::spawn(Box::pin(
processing::api::process_long_tail_assets(pair.clone(), sources.clone()),
))]
} else {
vec![tokio::spawn(Box::pin(
processing::api::process_data_by_pair(pair.clone()),
processing::api::process_data_by_pair(pair.clone(), my_cache),
))]
}
})
Expand All @@ -153,10 +173,12 @@ pub(crate) async fn api_monitor() {
}
}

#[instrument(skip(pool, cache))]
pub(crate) async fn onchain_monitor(
pool: Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
wait_for_syncing: bool,
data_type: &DataType,
cache: Cache<(String, u64), CoinPricesDTO>,
) {
let monitoring_config = get_config(None).await;

Expand Down Expand Up @@ -188,12 +210,14 @@ pub(crate) async fn onchain_monitor(
tokio::spawn(Box::pin(processing::spot::process_data_by_pair(
pool.clone(),
pair.clone(),
cache.clone(),
))),
tokio::spawn(Box::pin(
processing::spot::process_data_by_pair_and_sources(
pool.clone(),
pair.clone(),
sources.to_vec(),
cache.clone(),
),
)),
]
Expand All @@ -205,12 +229,14 @@ pub(crate) async fn onchain_monitor(
tokio::spawn(Box::pin(processing::future::process_data_by_pair(
pool.clone(),
pair.clone(),
cache.clone(),
))),
tokio::spawn(Box::pin(
processing::future::process_data_by_pair_and_sources(
pool.clone(),
pair.clone(),
sources.to_vec(),
cache.clone(),
),
)),
]
Expand All @@ -223,6 +249,7 @@ pub(crate) async fn onchain_monitor(
}
}

#[instrument(skip(pool))]
pub(crate) async fn publisher_monitor(
pool: Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
wait_for_syncing: bool,
Expand Down Expand Up @@ -266,6 +293,7 @@ pub(crate) async fn publisher_monitor(
}
}

#[instrument(skip(pool))]
pub(crate) async fn vrf_monitor(pool: Pool<AsyncDieselConnectionManager<AsyncPgConnection>>) {
tracing::info!("[VRF] Monitoring VRF requests..");

Expand Down
Loading

0 comments on commit bd3b461

Please sign in to comment.