diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml new file mode 100644 index 0000000..d67ffba --- /dev/null +++ b/.github/workflows/bench.yml @@ -0,0 +1,21 @@ +--- +name: Task - Benchmarks + +on: + workflow_dispatch: + workflow_call: + +jobs: + cargo-bench: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + # selecting a toolchain either by action or manual `rustup` calls should happen + # before the plugin, as the cache uses the current rustc version as its cache key + - run: rustup show + + - uses: Swatinem/rust-cache@v2 + - name: Benchmarks + run: | + cargo bench diff --git a/.github/workflows/pull-request.yml b/.github/workflows/pull-request.yml index 93081bc..d5d54e0 100644 --- a/.github/workflows/pull-request.yml +++ b/.github/workflows/pull-request.yml @@ -21,3 +21,8 @@ jobs: name: Run Cargo linters uses: ./.github/workflows/linters-cargo.yml needs: rust_build + + bench: + name: Run benchmarks + uses: ./.github/workflows/bench.yml + needs: rust_build diff --git a/Cargo.lock b/Cargo.lock index dcab050..e694862 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -52,6 +52,18 @@ dependencies = [ "libc", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + +[[package]] +name = "anstyle" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9" + [[package]] name = "arc-swap" version = "1.6.0" @@ -264,6 +276,12 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + [[package]] name = "cc" version = "1.0.83" @@ -294,6 +312,33 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", +] + [[package]] name = "cipher" version = "0.4.4" @@ -304,6 +349,31 @@ dependencies = [ "inout", ] +[[package]] +name = "clap" +version = "4.5.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3135e7ec2ef7b10c6ed8950f0f792ed96ee093fa088608f1c76e569722700c84" +dependencies = [ + "clap_builder", +] + +[[package]] +name = "clap_builder" +version = "4.5.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30582fc632330df2bd26877bde0c1f4470d57c582bbc070376afcd04d8cb4838" +dependencies = [ + "anstyle", + "clap_lex", +] + +[[package]] +name = "clap_lex" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -347,6 +417,44 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "criterion" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap", + "criterion-plot", + "futures", + "is-terminal", + "itertools", + "num-traits", + "once_cell", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "tokio", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" +dependencies = [ + "cast", + "itertools", +] + [[package]] name = "crossbeam-channel" version = "0.5.13" @@ -356,6 +464,16 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + [[package]] name = "crossbeam-epoch" version = "0.9.18" @@ -566,6 +684,12 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" +[[package]] +name = "either" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" + [[package]] name = "encoding_rs" version = "0.8.33" @@ -899,6 +1023,16 @@ dependencies = [ "tracing", ] +[[package]] +name = "half" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888" +dependencies = [ + "cfg-if", + "crunchy", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -1153,6 +1287,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.10" @@ -1417,6 +1560,12 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "oorandom" +version = "11.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9" + [[package]] name = "openssl" version = "0.10.61" @@ -1617,6 +1766,34 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "plotters" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" + +[[package]] +name = "plotters-svg" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" +dependencies = [ + "plotters-backend", +] + [[package]] name = "postgres-protocol" version = "0.6.6" @@ -1676,6 +1853,7 @@ dependencies = [ "axum-macros", "bigdecimal", "chrono", + "criterion", "deadpool", "diesel", "diesel-async", @@ -1849,6 +2027,26 @@ dependencies = [ "bitflags 2.4.1", ] +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -2091,6 +2289,15 @@ dependencies = [ "cipher", ] +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.22" @@ -2742,6 +2949,16 @@ dependencies = [ "crunchy", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tinyvec" version = "1.6.0" @@ -3085,6 +3302,16 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" diff --git a/Cargo.toml b/Cargo.toml index 65cd909..705e294 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,3 +50,8 @@ uuid = { version = "1.4", features = ["fast-rng", "v4", "serde"] } [dev-dependencies] rstest = "0.18.2" +criterion = { version = "0.5", features = ["async_tokio"] } + +[[bench]] +name = "coingecko_benchmarks" +harness = false diff --git a/benches/coingecko_benchmarks.rs b/benches/coingecko_benchmarks.rs new file mode 100644 index 0000000..694c082 --- /dev/null +++ b/benches/coingecko_benchmarks.rs @@ -0,0 +1,58 @@ +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use pragma_monitoring::constants::COINGECKO_IDS; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::runtime::Runtime; + +// Create a mock initialization function instead of calling the API +async fn initialize_test_mappings() { + let mut test_mappings = HashMap::new(); + // Add just a few test pairs + test_mappings.insert("BTC/USD".to_string(), "bitcoin".to_string()); + test_mappings.insert("ETH/USD".to_string(), "ethereum".to_string()); + test_mappings.insert("SOL/USD".to_string(), "solana".to_string()); + + COINGECKO_IDS.store(Arc::new(test_mappings)); +} + +fn criterion_benchmark(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + // Initialize with test data + rt.block_on(async { + initialize_test_mappings().await; + }); + + let mut group = c.benchmark_group("coingecko_operations"); + + // Benchmark simple lookup + group.bench_function("single_lookup", |b| { + b.iter(|| { + let mappings = COINGECKO_IDS.load(); + black_box(mappings.get("BTC/USD").cloned()) + }); + }); + + // Benchmark concurrent lookups with smaller load + group.bench_function("concurrent_lookups", |b| { + b.iter(|| { + rt.block_on(async { + let handles: Vec<_> = (0..3) // Reduced to just 3 concurrent lookups + .map(|_| { + tokio::spawn(async { + let mappings = COINGECKO_IDS.load(); + black_box(mappings.get("BTC/USD").cloned()) + }) + }) + .collect(); + + futures::future::join_all(handles).await + }); + }); + }); + + group.finish(); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/src/coingecko.rs b/src/coingecko.rs new file mode 100644 index 0000000..17b8da8 --- /dev/null +++ b/src/coingecko.rs @@ -0,0 +1,62 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +#[derive(Debug, Serialize, Deserialize)] +pub struct Coin { + id: String, + symbol: String, +} + +fn to_usd_pair(symbol: String) -> String { + format!("{}/USD", symbol) +} + +#[derive(Debug, thiserror::Error)] +pub enum CoinGeckoError { + #[error("Empty response from CoinGecko API")] + EmptyResponse, + #[error("Coingecko Coins API request failed with status: {0}")] + RequestFailed(String), + #[error("Coingecko Coins Failed to parse response: {0}")] + ParseError(#[from] reqwest::Error), +} + +pub async fn get_coingecko_mappings() -> Result, CoinGeckoError> { + let client = reqwest::Client::new(); + let mut mappings = HashMap::new(); + let mut page = 1; + + loop { + let url = format!( + "https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&order=market_cap_desc&per_page=100&page={}", + page + ); + + let response = client + .get(&url) + .header("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36") + .send() + .await + .map_err(CoinGeckoError::ParseError)?; + + if !response.status().is_success() { + return Err(CoinGeckoError::RequestFailed(response.status().to_string())); + } + + let coins: Vec = response.json().await?; + if coins.is_empty() { + if page == 1 { + return Err(CoinGeckoError::EmptyResponse); + } + break; + } + + coins.into_iter().for_each(|coin| { + mappings.insert(to_usd_pair(coin.symbol.to_uppercase()), coin.id); + }); + + page += 1; + } + + Ok(mappings) +} diff --git a/src/constants.rs b/src/constants.rs index de3cade..307b325 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -1,26 +1,69 @@ -use std::collections::HashMap; - +use arc_swap::ArcSwap; use lazy_static::lazy_static; -use phf::phf_map; use prometheus::{opts, register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec}; +use std::collections::HashMap; +use std::future::Future; +use std::sync::Arc; +use std::time::Duration; +use tokio::time::sleep; +use crate::coingecko::get_coingecko_mappings; pub(crate) static LOW_SOURCES_THRESHOLD: usize = 6; -#[allow(unused)] -pub(crate) static COINGECKO_IDS: phf::Map<&'static str, &'static str> = phf_map! { - "BTC/USD" => "bitcoin", - "ETH/USD" => "ethereum", - "LUSD/USD" => "liquity-usd", - "WBTC/USD" => "wrapped-bitcoin", - "DAI/USD" => "dai", - "USDC/USD" => "usd-coin", - "USDT/USD" => "tether", - "WSTETH/USD" => "wrapped-steth", - "LORDS/USD" => "lords", - "STRK/USD" => "starknet", - "ZEND/USD" => "zklend-2", - "NSTR/USD" => "nostra", -}; +lazy_static! { + pub static ref COINGECKO_IDS: ArcSwap> = + ArcSwap::new(Arc::new(HashMap::new())); +} + +const MAX_RETRIES: u32 = 3; +const RETRY_DELAY: Duration = Duration::from_secs(2); + +#[allow(dead_code)] +pub async fn initialize_coingecko_mappings() { + let mappings = retry_with_backoff(get_coingecko_mappings) + .await + .unwrap_or_else(|e| { + tracing::error!( + "Failed to initialize CoinGecko mappings after {} retries: {}", + MAX_RETRIES, + e + ); + panic!("Cannot start monitoring without CoinGecko mappings: {}", e); + }); + + COINGECKO_IDS.store(Arc::new(mappings)); + tracing::info!("Successfully initialized CoinGecko mappings"); +} + +async fn retry_with_backoff(f: F) -> Result +where + F: Fn() -> Fut, + Fut: Future>, + E: std::fmt::Display, +{ + let mut attempts = 0; + let mut last_error = None; + + while attempts < MAX_RETRIES { + match f().await { + Ok(result) => return Ok(result), + Err(e) => { + attempts += 1; + last_error = Some(e); + if attempts < MAX_RETRIES { + tracing::warn!( + "Attempt {} failed, retrying after {} seconds", + attempts, + RETRY_DELAY.as_secs() + ); + sleep(RETRY_DELAY).await; + } + } + } + } + + Err(last_error.unwrap()) +} lazy_static! { /// TODO: Current storage of long tail assets here is not really good. diff --git a/src/lib.rs b/src/lib.rs index 6a36dce..9ba75c3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ +pub mod coingecko; pub(crate) mod config; -pub(crate) mod constants; +pub mod constants; pub mod models; pub mod schema; pub mod types; diff --git a/src/main.rs b/src/main.rs index 69d15ad..f680aaa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,6 +21,8 @@ mod constants; mod types; // Utils mod utils; +// coingecko +mod coingecko; #[cfg(test)] mod tests; @@ -39,6 +41,7 @@ use tokio::task::JoinHandle; use tokio::time::interval; use config::{get_config, init_long_tail_asset_configuration, periodic_config_update, DataType}; +use constants::initialize_coingecko_mappings; use processing::common::{check_publisher_balance, data_indexers_are_synced}; use tracing::instrument; use utils::{is_long_tail_asset, log_monitoring_results, log_tasks_results}; @@ -51,6 +54,9 @@ struct MonitoringTask { #[tokio::main] async fn main() { + // Initialize CoinGecko mappings + initialize_coingecko_mappings().await; + // Start configuring a `fmt` subscriber let subscriber = tracing_subscriber::fmt() .compact() @@ -122,11 +128,23 @@ async fn spawn_monitoring_tasks( name: "VRF Monitoring".to_string(), handle: tokio::spawn(vrf_monitor(pool.clone())), }, + MonitoringTask { + name: "CoinGecko Mappings Update".to_string(), + handle: tokio::spawn(periodic_coingecko_update()), + }, ]; tasks } +async fn periodic_coingecko_update() { + let mut interval = interval(Duration::from_secs(86400)); // 1 day + loop { + interval.tick().await; + initialize_coingecko_mappings().await; + } +} + #[instrument] async fn handle_task_results(tasks: Vec) { let mut results = HashMap::new(); diff --git a/src/monitoring/on_off_deviation.rs b/src/monitoring/on_off_deviation.rs index 9f5e979..953cf2c 100644 --- a/src/monitoring/on_off_deviation.rs +++ b/src/monitoring/on_off_deviation.rs @@ -38,7 +38,7 @@ pub async fn on_off_price_deviation( data_type: DataType, cache: Cache<(String, u64), CoinPricesDTO>, ) -> Result<(f64, u32), MonitoringError> { - let ids = &COINGECKO_IDS; + let ids = COINGECKO_IDS.load(); let config = get_config(None).await; let client = &config.network().provider; let field_pair = cairo_short_string_to_felt(&pair_id).expect("failed to convert pair id"); @@ -81,7 +81,10 @@ pub async fn on_off_price_deviation( let (deviation, num_sources_aggregated) = match data_type { DataType::Spot => { - let coingecko_id = *ids.get(&pair_id).expect("Failed to get coingecko id"); + let coingecko_id = ids + .get(&pair_id) + .expect("Failed to get coingecko id") + .clone(); let coins_prices = query_defillama_api(timestamp, coingecko_id.to_owned(), cache).await?; diff --git a/src/monitoring/price_deviation.rs b/src/monitoring/price_deviation.rs index 5173f3b..5bbeb3e 100644 --- a/src/monitoring/price_deviation.rs +++ b/src/monitoring/price_deviation.rs @@ -50,10 +50,13 @@ pub async fn price_deviation( normalized_price: f64, cache: Cache<(String, u64), CoinPricesDTO>, ) -> Result { - let ids = &COINGECKO_IDS; + let ids = COINGECKO_IDS.load(); let pair_id = query.pair_id().to_string(); - let coingecko_id = *ids.get(&pair_id).expect("Failed to get coingecko id"); + let coingecko_id = ids + .get(&pair_id) + .expect("Failed to get coingecko id") + .clone(); let coins_prices = query_defillama_api( query.timestamp().timestamp().try_into().unwrap(), @@ -82,9 +85,12 @@ pub async fn raw_price_deviation( price: f64, cache: Cache<(String, u64), CoinPricesDTO>, ) -> Result { - let ids = &COINGECKO_IDS; + let ids = COINGECKO_IDS.load(); - let coingecko_id = *ids.get(pair_id).expect("Failed to get coingecko id"); + let coingecko_id = ids + .get(pair_id) + .expect("Failed to get coingecko id") + .clone(); let coins_prices = query_defillama_api( chrono::Utc::now().timestamp().try_into().unwrap(),