From 0d5e3e1335e81c3938eb70da606163d72276d33f Mon Sep 17 00:00:00 2001 From: Yasser Tahiri Date: Thu, 12 Dec 2024 14:04:45 +0100 Subject: [PATCH 1/7] :sparkles: implement CoinGecko API integration for dynamic coin mappings --- src/constants.rs | 105 ++++++++++++++++++++++++----- src/main.rs | 16 +++++ src/monitoring/on_off_deviation.rs | 7 +- src/monitoring/price_deviation.rs | 14 ++-- 4 files changed, 118 insertions(+), 24 deletions(-) diff --git a/src/constants.rs b/src/constants.rs index de3cade..a6f85b3 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -1,26 +1,95 @@ -use std::collections::HashMap; - +use arc_swap::ArcSwap; use lazy_static::lazy_static; -use phf::phf_map; use prometheus::{opts, register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec}; +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; +use std::{collections::HashMap, error::Error}; pub(crate) static LOW_SOURCES_THRESHOLD: usize = 6; -#[allow(unused)] -pub(crate) static COINGECKO_IDS: phf::Map<&'static str, &'static str> = phf_map! { - "BTC/USD" => "bitcoin", - "ETH/USD" => "ethereum", - "LUSD/USD" => "liquity-usd", - "WBTC/USD" => "wrapped-bitcoin", - "DAI/USD" => "dai", - "USDC/USD" => "usd-coin", - "USDT/USD" => "tether", - "WSTETH/USD" => "wrapped-steth", - "LORDS/USD" => "lords", - "STRK/USD" => "starknet", - "ZEND/USD" => "zklend-2", - "NSTR/USD" => "nostra", -}; +// Define the Coin struct to store the id and symbol of the coin. +#[derive(Debug, Serialize, Deserialize)] +pub struct Coin { + id: String, + symbol: String, +} + +// We already have the Link for the CoinGecko API, so we can use it to fetch the data. +// We will use the CoinGecko API to fetch the data. +// Example: https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&order=market_cap_desc&per_page=100&page=1 +// We will fetch the data for the first 100 coins. +// here is the data how its look like: +// [ +// { +// "id": "bitcoin", +// "symbol": "btc", +// "name": "Bitcoin", +// "image": "https://coin-images.coingecko.com/coins/images/1/large/bitcoin.png?1696501400", +// "current_price": 100390, +// ... +// }, +// ... +// ] +#[allow(dead_code)] +async fn get_coingecko_mappings() -> Result, Box> { + let client = reqwest::Client::new(); + let mut mappings = HashMap::new(); + let mut page = 1; + + loop { + let url = format!( + "https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&order=market_cap_desc&per_page=100&page={}", + page + ); + + let response = client + .get(&url) + .header("User-Agent", "Crypto Data Fetcher") + .send() + .await?; + + if !response.status().is_success() { + return Err(format!("API request failed with status: {}", response.status()).into()); + } + + let coins: Vec = response.json().await?; + if coins.is_empty() { + break; + } + + for coin in coins { + // Convert symbol to uppercase and create pair format + let pair = format!("{}/USD", coin.symbol.to_uppercase()); + mappings.insert(pair, coin.id); + } + + page += 1; + } + + Ok(mappings) +} + +// Replace the static phf_map with a lazy_static ArcSwap +lazy_static! { + pub static ref COINGECKO_IDS: ArcSwap> = + ArcSwap::new(Arc::new(HashMap::new())); +} + +#[allow(dead_code)] +pub async fn initialize_coingecko_mappings() { + match get_coingecko_mappings().await { + Ok(mappings) => { + COINGECKO_IDS.store(Arc::new(mappings)); + tracing::info!("Successfully initialized CoinGecko mappings"); + } + Err(e) => { + tracing::error!("Failed to initialize CoinGecko mappings: {}", e); + // You might want to panic here depending on how critical this is + // panic!("Failed to initialize CoinGecko mappings: {}", e); + } + } +} lazy_static! { /// TODO: Current storage of long tail assets here is not really good. diff --git a/src/main.rs b/src/main.rs index 69d15ad..714a49a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -39,6 +39,7 @@ use tokio::task::JoinHandle; use tokio::time::interval; use config::{get_config, init_long_tail_asset_configuration, periodic_config_update, DataType}; +use constants::initialize_coingecko_mappings; use processing::common::{check_publisher_balance, data_indexers_are_synced}; use tracing::instrument; use utils::{is_long_tail_asset, log_monitoring_results, log_tasks_results}; @@ -51,6 +52,9 @@ struct MonitoringTask { #[tokio::main] async fn main() { + // Initialize CoinGecko mappings + initialize_coingecko_mappings().await; + // Start configuring a `fmt` subscriber let subscriber = tracing_subscriber::fmt() .compact() @@ -122,11 +126,23 @@ async fn spawn_monitoring_tasks( name: "VRF Monitoring".to_string(), handle: tokio::spawn(vrf_monitor(pool.clone())), }, + MonitoringTask { + name: "CoinGecko Mappings Update".to_string(), + handle: tokio::spawn(periodic_coingecko_update()), + }, ]; tasks } +async fn periodic_coingecko_update() { + let mut interval = interval(Duration::from_secs(86400)); // 1 day + loop { + interval.tick().await; + initialize_coingecko_mappings().await; + } +} + #[instrument] async fn handle_task_results(tasks: Vec) { let mut results = HashMap::new(); diff --git a/src/monitoring/on_off_deviation.rs b/src/monitoring/on_off_deviation.rs index 9f5e979..953cf2c 100644 --- a/src/monitoring/on_off_deviation.rs +++ b/src/monitoring/on_off_deviation.rs @@ -38,7 +38,7 @@ pub async fn on_off_price_deviation( data_type: DataType, cache: Cache<(String, u64), CoinPricesDTO>, ) -> Result<(f64, u32), MonitoringError> { - let ids = &COINGECKO_IDS; + let ids = COINGECKO_IDS.load(); let config = get_config(None).await; let client = &config.network().provider; let field_pair = cairo_short_string_to_felt(&pair_id).expect("failed to convert pair id"); @@ -81,7 +81,10 @@ pub async fn on_off_price_deviation( let (deviation, num_sources_aggregated) = match data_type { DataType::Spot => { - let coingecko_id = *ids.get(&pair_id).expect("Failed to get coingecko id"); + let coingecko_id = ids + .get(&pair_id) + .expect("Failed to get coingecko id") + .clone(); let coins_prices = query_defillama_api(timestamp, coingecko_id.to_owned(), cache).await?; diff --git a/src/monitoring/price_deviation.rs b/src/monitoring/price_deviation.rs index 5173f3b..5bbeb3e 100644 --- a/src/monitoring/price_deviation.rs +++ b/src/monitoring/price_deviation.rs @@ -50,10 +50,13 @@ pub async fn price_deviation( normalized_price: f64, cache: Cache<(String, u64), CoinPricesDTO>, ) -> Result { - let ids = &COINGECKO_IDS; + let ids = COINGECKO_IDS.load(); let pair_id = query.pair_id().to_string(); - let coingecko_id = *ids.get(&pair_id).expect("Failed to get coingecko id"); + let coingecko_id = ids + .get(&pair_id) + .expect("Failed to get coingecko id") + .clone(); let coins_prices = query_defillama_api( query.timestamp().timestamp().try_into().unwrap(), @@ -82,9 +85,12 @@ pub async fn raw_price_deviation( price: f64, cache: Cache<(String, u64), CoinPricesDTO>, ) -> Result { - let ids = &COINGECKO_IDS; + let ids = COINGECKO_IDS.load(); - let coingecko_id = *ids.get(pair_id).expect("Failed to get coingecko id"); + let coingecko_id = ids + .get(pair_id) + .expect("Failed to get coingecko id") + .clone(); let coins_prices = query_defillama_api( chrono::Utc::now().timestamp().try_into().unwrap(), From 8532b1c5b89cf3de7e7a08f1df7af0649abcd0e0 Mon Sep 17 00:00:00 2001 From: Yasser Tahiri Date: Thu, 12 Dec 2024 14:37:15 +0100 Subject: [PATCH 2/7] :recycle: add benchmarking workflow and implement CoinGecko benchmarks --- .github/workflows/bench.yml | 21 +++ .github/workflows/pull-request.yml | 5 + Cargo.lock | 229 ++++++++++++++++++++++++++++- Cargo.toml | 5 + benches/coingecko_benchmarks.rs | 53 +++++++ src/lib.rs | 2 +- 6 files changed, 313 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/bench.yml create mode 100644 benches/coingecko_benchmarks.rs diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml new file mode 100644 index 0000000..d5690e8 --- /dev/null +++ b/.github/workflows/bench.yml @@ -0,0 +1,21 @@ +--- +name: Task - Benchmarks + +on: + workflow_dispatch: + workflow_call: + +jobs: + cargo-lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + # selecting a toolchain either by action or manual `rustup` calls should happen + # before the plugin, as the cache uses the current rustc version as its cache key + - run: rustup show + + - uses: Swatinem/rust-cache@v2 + - name: Benchmarks + run: | + cargo bench diff --git a/.github/workflows/pull-request.yml b/.github/workflows/pull-request.yml index 93081bc..d5d54e0 100644 --- a/.github/workflows/pull-request.yml +++ b/.github/workflows/pull-request.yml @@ -21,3 +21,8 @@ jobs: name: Run Cargo linters uses: ./.github/workflows/linters-cargo.yml needs: rust_build + + bench: + name: Run benchmarks + uses: ./.github/workflows/bench.yml + needs: rust_build diff --git a/Cargo.lock b/Cargo.lock index dcab050..e694862 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -52,6 +52,18 @@ dependencies = [ "libc", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + +[[package]] +name = "anstyle" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9" + [[package]] name = "arc-swap" version = "1.6.0" @@ -264,6 +276,12 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + [[package]] name = "cc" version = "1.0.83" @@ -294,6 +312,33 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", +] + [[package]] name = "cipher" version = "0.4.4" @@ -304,6 +349,31 @@ dependencies = [ "inout", ] +[[package]] +name = "clap" +version = "4.5.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3135e7ec2ef7b10c6ed8950f0f792ed96ee093fa088608f1c76e569722700c84" +dependencies = [ + "clap_builder", +] + +[[package]] +name = "clap_builder" +version = "4.5.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30582fc632330df2bd26877bde0c1f4470d57c582bbc070376afcd04d8cb4838" +dependencies = [ + "anstyle", + "clap_lex", +] + +[[package]] +name = "clap_lex" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -347,6 +417,44 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "criterion" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap", + "criterion-plot", + "futures", + "is-terminal", + "itertools", + "num-traits", + "once_cell", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "tokio", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" +dependencies = [ + "cast", + "itertools", +] + [[package]] name = "crossbeam-channel" version = "0.5.13" @@ -356,6 +464,16 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + [[package]] name = "crossbeam-epoch" version = "0.9.18" @@ -566,6 +684,12 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" +[[package]] +name = "either" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" + [[package]] name = "encoding_rs" version = "0.8.33" @@ -899,6 +1023,16 @@ dependencies = [ "tracing", ] +[[package]] +name = "half" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888" +dependencies = [ + "cfg-if", + "crunchy", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -1153,6 +1287,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.10" @@ -1417,6 +1560,12 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "oorandom" +version = "11.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9" + [[package]] name = "openssl" version = "0.10.61" @@ -1617,6 +1766,34 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "plotters" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" + +[[package]] +name = "plotters-svg" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" +dependencies = [ + "plotters-backend", +] + [[package]] name = "postgres-protocol" version = "0.6.6" @@ -1676,6 +1853,7 @@ dependencies = [ "axum-macros", "bigdecimal", "chrono", + "criterion", "deadpool", "diesel", "diesel-async", @@ -1849,6 +2027,26 @@ dependencies = [ "bitflags 2.4.1", ] +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -2091,6 +2289,15 @@ dependencies = [ "cipher", ] +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.22" @@ -2742,6 +2949,16 @@ dependencies = [ "crunchy", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tinyvec" version = "1.6.0" @@ -3085,6 +3302,16 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" diff --git a/Cargo.toml b/Cargo.toml index 65cd909..8409366 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,3 +50,8 @@ uuid = { version = "1.4", features = ["fast-rng", "v4", "serde"] } [dev-dependencies] rstest = "0.18.2" +criterion = { version = "0.5", features = ["async_tokio"] } + +[[bench]] +name = "coingecko_benchmarks" +harness = false \ No newline at end of file diff --git a/benches/coingecko_benchmarks.rs b/benches/coingecko_benchmarks.rs new file mode 100644 index 0000000..568df82 --- /dev/null +++ b/benches/coingecko_benchmarks.rs @@ -0,0 +1,53 @@ +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use pragma_monitoring::constants::{initialize_coingecko_mappings, COINGECKO_IDS}; +use tokio::runtime::Runtime; + +fn criterion_benchmark(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + // Benchmark initialization + c.bench_function("initialize_coingecko_mappings", |b| { + b.iter(|| { + rt.block_on(async { + initialize_coingecko_mappings().await; + }); + }); + }); + + // Benchmark lookups + c.bench_function("coingecko_lookup", |b| { + rt.block_on(async { + initialize_coingecko_mappings().await; + }); + + b.iter(|| { + let mappings = COINGECKO_IDS.load(); + black_box(mappings.get("BTC/USD")); + }); + }); + + // Benchmark concurrent access + c.bench_function("concurrent_access", |b| { + rt.block_on(async { + initialize_coingecko_mappings().await; + }); + + b.iter(|| { + rt.block_on(async { + let handles: Vec<_> = (0..10) + .map(|_| { + tokio::spawn(async { + let mappings = COINGECKO_IDS.load(); + black_box(mappings.get("BTC/USD")); + }) + }) + .collect(); + + futures::future::join_all(handles).await; + }); + }); + }); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/src/lib.rs b/src/lib.rs index 6a36dce..a97cfbe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,5 @@ pub(crate) mod config; -pub(crate) mod constants; +pub mod constants; pub mod models; pub mod schema; pub mod types; From 82298a8cf6ea6207e785f35724b3441df0064542 Mon Sep 17 00:00:00 2001 From: Yasser Tahiri Date: Thu, 12 Dec 2024 14:39:07 +0100 Subject: [PATCH 3/7] fix: ensure proper formatting in Cargo.toml for benchmarks section --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 8409366..705e294 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,4 +54,4 @@ criterion = { version = "0.5", features = ["async_tokio"] } [[bench]] name = "coingecko_benchmarks" -harness = false \ No newline at end of file +harness = false From 11ccd85761592d3256fd1999e288c16d49b8f6fb Mon Sep 17 00:00:00 2001 From: Yasser Tahiri Date: Thu, 12 Dec 2024 18:29:18 +0100 Subject: [PATCH 4/7] Update src/constants.rs Co-authored-by: 0xevolve --- src/constants.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/constants.rs b/src/constants.rs index a6f85b3..1b5ebac 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -50,7 +50,7 @@ async fn get_coingecko_mappings() -> Result, Box = response.json().await?; From 70ed9f0f2a32a09e7690311f665aedfd0169de0f Mon Sep 17 00:00:00 2001 From: Yasser Tahiri Date: Thu, 12 Dec 2024 22:19:28 +0100 Subject: [PATCH 5/7] :sparkles: add CoinGecko API integration for fetching coin mappings --- src/coingecko.rs | 62 ++++++++++++++++++++++++++++++++++ src/constants.rs | 86 +++++------------------------------------------- src/lib.rs | 1 + src/main.rs | 2 ++ 4 files changed, 74 insertions(+), 77 deletions(-) create mode 100644 src/coingecko.rs diff --git a/src/coingecko.rs b/src/coingecko.rs new file mode 100644 index 0000000..9a1f906 --- /dev/null +++ b/src/coingecko.rs @@ -0,0 +1,62 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +#[derive(Debug, Serialize, Deserialize)] +pub struct Coin { + id: String, + symbol: String, +} + +fn to_usd_pair(symbol: String) -> String { + format!("{}/USD", symbol) +} + +#[derive(Debug, thiserror::Error)] +pub enum CoinGeckoError { + #[error("Empty response from CoinGecko API")] + EmptyResponse, + #[error("Coingecko Coins API request failed with status: {0}")] + RequestFailed(String), + #[error("Coingecko Coins Failed to parse response: {0}")] + ParseError(#[from] reqwest::Error), +} + +pub async fn get_coingecko_mappings() -> Result, CoinGeckoError> { + let client = reqwest::Client::new(); + let mut mappings = HashMap::new(); + let mut page = 1; + + loop { + let url = format!( + "https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&order=market_cap_desc&per_page=250&page={}", + page + ); + + let response = client + .get(&url) + .header("User-Agent", "Crypto Data Fetcher") + .send() + .await + .map_err(CoinGeckoError::ParseError)?; + + if !response.status().is_success() { + return Err(CoinGeckoError::RequestFailed(response.status().to_string())); + } + + let coins: Vec = response.json().await?; + if coins.is_empty() { + if page == 1 { + return Err(CoinGeckoError::EmptyResponse); + } + break; + } + + coins.into_iter().for_each(|coin| { + mappings.insert(to_usd_pair(coin.symbol.to_uppercase()), coin.id); + }); + + page += 1; + } + + Ok(mappings) +} diff --git a/src/constants.rs b/src/constants.rs index 1b5ebac..d524c01 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -1,76 +1,12 @@ use arc_swap::ArcSwap; use lazy_static::lazy_static; use prometheus::{opts, register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec}; +use std::collections::HashMap; use std::sync::Arc; -use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, error::Error}; - +use crate::coingecko::get_coingecko_mappings; pub(crate) static LOW_SOURCES_THRESHOLD: usize = 6; -// Define the Coin struct to store the id and symbol of the coin. -#[derive(Debug, Serialize, Deserialize)] -pub struct Coin { - id: String, - symbol: String, -} - -// We already have the Link for the CoinGecko API, so we can use it to fetch the data. -// We will use the CoinGecko API to fetch the data. -// Example: https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&order=market_cap_desc&per_page=100&page=1 -// We will fetch the data for the first 100 coins. -// here is the data how its look like: -// [ -// { -// "id": "bitcoin", -// "symbol": "btc", -// "name": "Bitcoin", -// "image": "https://coin-images.coingecko.com/coins/images/1/large/bitcoin.png?1696501400", -// "current_price": 100390, -// ... -// }, -// ... -// ] -#[allow(dead_code)] -async fn get_coingecko_mappings() -> Result, Box> { - let client = reqwest::Client::new(); - let mut mappings = HashMap::new(); - let mut page = 1; - - loop { - let url = format!( - "https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&order=market_cap_desc&per_page=100&page={}", - page - ); - - let response = client - .get(&url) - .header("User-Agent", "Crypto Data Fetcher") - .send() - .await?; - - if !response.status().is_success() { - return Err(format!("Coingecko Coins API request failed with status: {}", response.status()).into()); - } - - let coins: Vec = response.json().await?; - if coins.is_empty() { - break; - } - - for coin in coins { - // Convert symbol to uppercase and create pair format - let pair = format!("{}/USD", coin.symbol.to_uppercase()); - mappings.insert(pair, coin.id); - } - - page += 1; - } - - Ok(mappings) -} - -// Replace the static phf_map with a lazy_static ArcSwap lazy_static! { pub static ref COINGECKO_IDS: ArcSwap> = ArcSwap::new(Arc::new(HashMap::new())); @@ -78,17 +14,13 @@ lazy_static! { #[allow(dead_code)] pub async fn initialize_coingecko_mappings() { - match get_coingecko_mappings().await { - Ok(mappings) => { - COINGECKO_IDS.store(Arc::new(mappings)); - tracing::info!("Successfully initialized CoinGecko mappings"); - } - Err(e) => { - tracing::error!("Failed to initialize CoinGecko mappings: {}", e); - // You might want to panic here depending on how critical this is - // panic!("Failed to initialize CoinGecko mappings: {}", e); - } - } + let mappings = get_coingecko_mappings().await.unwrap_or_else(|e| { + tracing::error!("Failed to initialize CoinGecko mappings: {}", e); + panic!("Cannot start monitoring without CoinGecko mappings: {}", e); + }); + + COINGECKO_IDS.store(Arc::new(mappings)); + tracing::info!("Successfully initialized CoinGecko mappings"); } lazy_static! { diff --git a/src/lib.rs b/src/lib.rs index a97cfbe..9ba75c3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +pub mod coingecko; pub(crate) mod config; pub mod constants; pub mod models; diff --git a/src/main.rs b/src/main.rs index 714a49a..f680aaa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,6 +21,8 @@ mod constants; mod types; // Utils mod utils; +// coingecko +mod coingecko; #[cfg(test)] mod tests; From 5f7c14e740e123c5c8b76b1a91e8b2494cfd627f Mon Sep 17 00:00:00 2001 From: Yasser Tahiri Date: Thu, 12 Dec 2024 22:24:44 +0100 Subject: [PATCH 6/7] :recycle: rename cargo-lint job to cargo-bench and adjust CoinGecko API pagination --- .github/workflows/bench.yml | 2 +- src/coingecko.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index d5690e8..d67ffba 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -6,7 +6,7 @@ on: workflow_call: jobs: - cargo-lint: + cargo-bench: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 diff --git a/src/coingecko.rs b/src/coingecko.rs index 9a1f906..cb39623 100644 --- a/src/coingecko.rs +++ b/src/coingecko.rs @@ -28,7 +28,7 @@ pub async fn get_coingecko_mappings() -> Result, CoinGec loop { let url = format!( - "https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&order=market_cap_desc&per_page=250&page={}", + "https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&order=market_cap_desc&per_page=100&page={}", page ); From bf6d5698d13008e417b025a41e343ba0f16ac2b4 Mon Sep 17 00:00:00 2001 From: Yasser Tahiri Date: Fri, 13 Dec 2024 21:08:45 +0100 Subject: [PATCH 7/7] :recycle: refactor CoinGecko benchmarks to use mock data and implement retry logic for API initialization --- benches/coingecko_benchmarks.rs | 51 ++++++++++++++++++--------------- src/coingecko.rs | 2 +- src/constants.rs | 50 +++++++++++++++++++++++++++++--- 3 files changed, 75 insertions(+), 28 deletions(-) diff --git a/benches/coingecko_benchmarks.rs b/benches/coingecko_benchmarks.rs index 568df82..694c082 100644 --- a/benches/coingecko_benchmarks.rs +++ b/benches/coingecko_benchmarks.rs @@ -1,52 +1,57 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use pragma_monitoring::constants::{initialize_coingecko_mappings, COINGECKO_IDS}; +use pragma_monitoring::constants::COINGECKO_IDS; +use std::collections::HashMap; +use std::sync::Arc; use tokio::runtime::Runtime; +// Create a mock initialization function instead of calling the API +async fn initialize_test_mappings() { + let mut test_mappings = HashMap::new(); + // Add just a few test pairs + test_mappings.insert("BTC/USD".to_string(), "bitcoin".to_string()); + test_mappings.insert("ETH/USD".to_string(), "ethereum".to_string()); + test_mappings.insert("SOL/USD".to_string(), "solana".to_string()); + + COINGECKO_IDS.store(Arc::new(test_mappings)); +} + fn criterion_benchmark(c: &mut Criterion) { let rt = Runtime::new().unwrap(); - // Benchmark initialization - c.bench_function("initialize_coingecko_mappings", |b| { - b.iter(|| { - rt.block_on(async { - initialize_coingecko_mappings().await; - }); - }); + // Initialize with test data + rt.block_on(async { + initialize_test_mappings().await; }); - // Benchmark lookups - c.bench_function("coingecko_lookup", |b| { - rt.block_on(async { - initialize_coingecko_mappings().await; - }); + let mut group = c.benchmark_group("coingecko_operations"); + // Benchmark simple lookup + group.bench_function("single_lookup", |b| { b.iter(|| { let mappings = COINGECKO_IDS.load(); - black_box(mappings.get("BTC/USD")); + black_box(mappings.get("BTC/USD").cloned()) }); }); - // Benchmark concurrent access - c.bench_function("concurrent_access", |b| { - rt.block_on(async { - initialize_coingecko_mappings().await; - }); - + // Benchmark concurrent lookups with smaller load + group.bench_function("concurrent_lookups", |b| { b.iter(|| { rt.block_on(async { - let handles: Vec<_> = (0..10) + let handles: Vec<_> = (0..3) // Reduced to just 3 concurrent lookups .map(|_| { tokio::spawn(async { let mappings = COINGECKO_IDS.load(); - black_box(mappings.get("BTC/USD")); + black_box(mappings.get("BTC/USD").cloned()) }) }) .collect(); - futures::future::join_all(handles).await; + futures::future::join_all(handles).await }); }); }); + + group.finish(); } criterion_group!(benches, criterion_benchmark); diff --git a/src/coingecko.rs b/src/coingecko.rs index cb39623..17b8da8 100644 --- a/src/coingecko.rs +++ b/src/coingecko.rs @@ -34,7 +34,7 @@ pub async fn get_coingecko_mappings() -> Result, CoinGec let response = client .get(&url) - .header("User-Agent", "Crypto Data Fetcher") + .header("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36") .send() .await .map_err(CoinGeckoError::ParseError)?; diff --git a/src/constants.rs b/src/constants.rs index d524c01..307b325 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -2,7 +2,10 @@ use arc_swap::ArcSwap; use lazy_static::lazy_static; use prometheus::{opts, register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec}; use std::collections::HashMap; +use std::future::Future; use std::sync::Arc; +use std::time::Duration; +use tokio::time::sleep; use crate::coingecko::get_coingecko_mappings; pub(crate) static LOW_SOURCES_THRESHOLD: usize = 6; @@ -12,17 +15,56 @@ lazy_static! { ArcSwap::new(Arc::new(HashMap::new())); } +const MAX_RETRIES: u32 = 3; +const RETRY_DELAY: Duration = Duration::from_secs(2); + #[allow(dead_code)] pub async fn initialize_coingecko_mappings() { - let mappings = get_coingecko_mappings().await.unwrap_or_else(|e| { - tracing::error!("Failed to initialize CoinGecko mappings: {}", e); - panic!("Cannot start monitoring without CoinGecko mappings: {}", e); - }); + let mappings = retry_with_backoff(get_coingecko_mappings) + .await + .unwrap_or_else(|e| { + tracing::error!( + "Failed to initialize CoinGecko mappings after {} retries: {}", + MAX_RETRIES, + e + ); + panic!("Cannot start monitoring without CoinGecko mappings: {}", e); + }); COINGECKO_IDS.store(Arc::new(mappings)); tracing::info!("Successfully initialized CoinGecko mappings"); } +async fn retry_with_backoff(f: F) -> Result +where + F: Fn() -> Fut, + Fut: Future>, + E: std::fmt::Display, +{ + let mut attempts = 0; + let mut last_error = None; + + while attempts < MAX_RETRIES { + match f().await { + Ok(result) => return Ok(result), + Err(e) => { + attempts += 1; + last_error = Some(e); + if attempts < MAX_RETRIES { + tracing::warn!( + "Attempt {} failed, retrying after {} seconds", + attempts, + RETRY_DELAY.as_secs() + ); + sleep(RETRY_DELAY).await; + } + } + } + } + + Err(last_error.unwrap()) +} + lazy_static! { /// TODO: Current storage of long tail assets here is not really good. /// We should probably store them either in a yaml config file or a