diff --git a/.env.example b/.env.example index d06ffb7..0a12ace 100644 --- a/.env.example +++ b/.env.example @@ -8,4 +8,6 @@ MAINNET_RPC_URL= # Config NETWORK=testnet ORACLE_ADDRESS=0x -PAIRS=BTC/USD,ETH/USD \ No newline at end of file +PAIRS=BTC/USD,ETH/USD +IGNORE_SOURCES=BITSTAMP,DEFILLAMA +IGNORE_PUBLISHERS=BINANCE \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 841c6b7..c35c4e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1416,6 +1416,12 @@ version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" + [[package]] name = "futures-util" version = "0.3.29" @@ -1485,6 +1491,12 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "good_lp" version = "1.7.0" @@ -2044,6 +2056,7 @@ dependencies = [ "phf", "prometheus", "reqwest", + "rstest", "serde", "starknet", "starknet_api", @@ -2775,6 +2788,35 @@ dependencies = [ "rustc-hex", ] +[[package]] +name = "rstest" +version = "0.18.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97eeab2f3c0a199bc4be135c36c924b6590b88c377d416494288c14f2db30199" +dependencies = [ + "futures", + "futures-timer", + "rstest_macros", + "rustc_version", +] + +[[package]] +name = "rstest_macros" +version = "0.18.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d428f8247852f894ee1be110b375111b586d4fa431f6c46e64ba5a0dcccbe605" +dependencies = [ + "cfg-if", + "glob", + "proc-macro2", + "quote", + "regex", + "relative-path", + "rustc_version", + "syn 2.0.41", + "unicode-ident", +] + [[package]] name = "rustc-demangle" version = "0.1.23" diff --git a/Cargo.toml b/Cargo.toml index 11aad28..9685399 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,3 +43,6 @@ strum = { version = "0.25.0", features = ["derive"] } tokio = { version = "1", features = ["full"] } url = "2.5.0" uuid = { version = "1.4", features = ["fast-rng", "v4", "serde"] } + +[dev-dependencies] +rstest = "0.18.2" diff --git a/prometheus/alertmanager.yml b/prometheus/alertmanager.yml index dd7c2b1..e7d10f2 100644 --- a/prometheus/alertmanager.yml +++ b/prometheus/alertmanager.yml @@ -1,18 +1,16 @@ global: - smtp_smarthost: "${SMTP_HOST}" - smtp_from: "${SMTP_FROM}" - smtp_auth_username: "${SMTP_AUTH_USERNAME}" - smtp_auth_password: "${SMTP_AUTH_PASSWORD}" - smtp_require_tls: true + resolve_timeout: 5m route: group_by: ["instance", "severity"] group_wait: 30s group_interval: 5m repeat_interval: 4h - receiver: "email_configs" + receiver: "main-telegram" + receivers: - - name: "email_configs" - email_configs: - - to: "${EMAIL_TO}" - send_resolved: true + - name: "main-telegram" + telegram_configs: + - bot_token: "${TELEGRAM_BOT_TOKEN}" + chat_id: -1001904637278 + parse_mode: "" diff --git a/prometheus/alerts.rules.yml b/prometheus/alerts.rules.yml index 89b5141..d074cb0 100644 --- a/prometheus/alerts.rules.yml +++ b/prometheus/alerts.rules.yml @@ -1,11 +1,35 @@ groups: - - name: example + - name: Oracle rules: - alert: TimeSinceLastUpdateTooHigh expr: time_since_last_update_seconds > 1200 for: 5m labels: - severity: critical + severity: warning annotations: summary: "Time since the last update is too high" - description: "The time since the last update of {{ $labels.pair }} from {{ $labels.source }} has exceeded 1200 seconds." + description: "The time since the last update from {{ $labels.publisher }} has exceeded 1200 seconds." + - alert: WrongPrice + expr: price_deviation > 0.02 + for: 5m + labels: + severity: critical + annotations: + summary: "Price deviation is too high" + description: "The price deviation of {{ $labels.pair }} from {{ $labels.source }} has exceeded 5%." + - alert: TooFewSources + expr: num_sources < 5 + for: 5m + labels: + severity: critical + annotations: + summary: "Too few sources" + description: "The number of sources for {{ $labels.pair }} has fallen below 5." + - alert: SourceDeviation + expr: price_deviation_source > 0.02 + for: 5m + labels: + severity: critical + annotations: + summary: "Source deviation is too high" + description: "The source deviation of {{ $labels.pair }} from {{ $labels.source }} has exceeded 5%." diff --git a/prometheus/prometheus.yml b/prometheus/prometheus.yml index eb6203e..3ae8df3 100644 --- a/prometheus/prometheus.yml +++ b/prometheus/prometheus.yml @@ -6,10 +6,12 @@ scrape_configs: - job_name: "prometheus_monitoring" static_configs: - targets: ["host.docker.internal:8081"] + rule_files: - "alerts.rules.yml" + alerting: alertmanagers: - static_configs: - targets: - - localhost:9093 + - host.docker.internal:9093 diff --git a/scripts/db.sh b/scripts/db.sh new file mode 100644 index 0000000..98a99e5 --- /dev/null +++ b/scripts/db.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +set -ex +cd "$(dirname "$0")" + +docker rm -f oracle_monitoring +docker run -d --name=oracle_monitoring -p 5432:5432 -e POSTGRES_PASSWORD=password postgres +sleep 5 + +DATABASE_URL=postgresql://postgres:password@localhost:5432/test cargo test \ No newline at end of file diff --git a/scripts/prom.sh b/scripts/prom.sh new file mode 100644 index 0000000..8fa88af --- /dev/null +++ b/scripts/prom.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash +set -e +cd "$(dirname "$0")" + +echo "Starting prometheus on: http://localhost:9090" + +docker rm prom +docker run \ + -p 9090:9090 \ + -v $(pwd)/../prometheus:/etc/prometheus \ + --name prom \ + prom/prometheus \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index 27a8008..ae986bb 100644 --- a/src/config.rs +++ b/src/config.rs @@ -21,6 +21,8 @@ pub enum NetworkName { Mainnet, #[strum(ascii_case_insensitive)] Testnet, + #[strum(ascii_case_insensitive)] + Katana, } #[derive(Debug, Clone)] @@ -93,6 +95,26 @@ impl Config { }, } } + NetworkName::Katana => { + let url = Url::parse("http://localhost:5050").expect("Invalid JSON RPC URL"); + let rpc_client = JsonRpcClient::new(HttpTransport::new(url)); // Katana URL + + let (decimals, sources, publishers, publisher_registry_address) = + init_oracle_config(&rpc_client, oracle_address, pairs.clone()).await; + + Self { + pairs, + sources, + publishers, + decimals, + network: Network { + name: "katana".to_string(), + provider: Arc::new(rpc_client), + oracle_address, + publisher_registry_address, + }, + } + } } } } @@ -140,9 +162,27 @@ async fn init_oracle_config( let publishers = publishers[1..].to_vec(); + // Exclude publishers that are not supported by the monitoring service + let excluded_publishers = std::env::var("IGNORE_PUBLISHERS") + .unwrap_or("".to_string()) + .split(',') + .map(|publisher| publisher.to_string()) + .collect::>(); + + let publishers = publishers + .into_iter() + .filter(|publisher| !excluded_publishers.contains(publisher)) + .collect::>(); + let mut sources: HashMap> = HashMap::new(); let mut decimals: HashMap = HashMap::new(); + let excluded_sources = std::env::var("IGNORE_SOURCES") + .unwrap_or("".to_string()) + .split(',') + .map(|source| source.to_string()) + .collect::>(); + for pair in &pairs { let field_pair = cairo_short_string_to_felt(pair).unwrap(); @@ -212,7 +252,7 @@ async fn init_oracle_config( for source in spot_pair_sources { let source = parse_cairo_short_string(&source).unwrap(); - if !pair_sources.contains(&source) { + if !pair_sources.contains(&source) && !excluded_sources.contains(&source) { pair_sources.push(source); } } diff --git a/src/constants.rs b/src/constants.rs index 5c4cd71..317fc58 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -1,4 +1,6 @@ +use lazy_static::lazy_static; use phf::phf_map; +use prometheus::{opts, register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec}; pub(crate) static COINGECKO_IDS: phf::Map<&'static str, &'static str> = phf_map! { "BTC/USD" => "bitcoin", @@ -11,3 +13,51 @@ pub(crate) static COINGECKO_IDS: phf::Map<&'static str, &'static str> = phf_map! "WSTETH/USD" => "wrapped-steth", "LORDS/USD" => "lords", }; + +lazy_static! { + pub static ref TIME_SINCE_LAST_UPDATE_PUBLISHER: GaugeVec = register_gauge_vec!( + opts!( + "time_since_last_update_seconds", + "Time since the last update in seconds." + ), + &["publisher"] + ) + .unwrap(); + pub static ref PAIR_PRICE: GaugeVec = register_gauge_vec!( + opts!("pair_price", "Price of the pair from the source."), + &["pair", "source"] + ) + .unwrap(); + pub static ref TIME_SINCE_LAST_UPDATE_PAIR_ID: GaugeVec = register_gauge_vec!( + opts!( + "time_since_last_update_pair_id", + "Time since the last update in seconds." + ), + &["pair"] + ) + .unwrap(); + pub static ref PRICE_DEVIATION: GaugeVec = register_gauge_vec!( + opts!( + "price_deviation", + "Price deviation from the reference price." + ), + &["pair", "source"] + ) + .unwrap(); + pub static ref PRICE_DEVIATION_SOURCE: GaugeVec = register_gauge_vec!( + opts!( + "price_deviation_source", + "Price deviation from the reference price." + ), + &["pair", "source"] + ) + .unwrap(); + pub static ref NUM_SOURCES: IntGaugeVec = register_int_gauge_vec!( + opts!( + "num_sources", + "Number of sources that have published data for a pair." + ), + &["pair"] + ) + .unwrap(); +} diff --git a/src/error.rs b/src/error.rs index 0a42ba7..1a6296d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,5 +1,7 @@ use std::{error::Error as StdError, fmt}; +use starknet::providers::ProviderError; + #[derive(Debug)] pub enum MonitoringError { Price(String), @@ -8,6 +10,7 @@ pub enum MonitoringError { Api(String), Conversion(String), OnChain(String), + Provider(ProviderError), } impl StdError for MonitoringError {} @@ -21,6 +24,7 @@ impl fmt::Display for MonitoringError { MonitoringError::Api(e) => write!(f, "API Error: {}", e), MonitoringError::Conversion(e) => write!(f, "Conversion Error: {}", e), MonitoringError::OnChain(e) => write!(f, "OnChain Error: {}", e), + MonitoringError::Provider(e) => write!(f, "Provider Error: {}", e), } } } diff --git a/src/main.rs b/src/main.rs index a2dc94d..8edfe3d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,11 +6,15 @@ use config::Config; use config::NetworkName; use diesel_async::pooled_connection::deadpool::*; use diesel_async::pooled_connection::AsyncDieselConnectionManager; +use diesel_async::AsyncPgConnection; + use dotenv::dotenv; use starknet::core::types::FieldElement; use std::env; use std::str::FromStr; +use crate::process_data::is_syncing; + // Configuration mod config; // Error handling @@ -28,6 +32,9 @@ mod schema; // Constants mod constants; +#[cfg(test)] +mod tests; + #[tokio::main] async fn main() { env_logger::init(); @@ -57,9 +64,30 @@ async fn main() { let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5)); + monitor(&pool, monitoring_config, &mut interval, true).await; +} + +pub(crate) async fn monitor( + pool: &deadpool::managed::Pool>, + monitoring_config: Config, + interval: &mut tokio::time::Interval, + wait_for_syncing: bool, +) { loop { interval.tick().await; // Wait for the next tick + // Skip if indexer is still syncing + if wait_for_syncing { + if let Some(blocks_left) = + is_syncing(pool.clone(), monitoring_config.network.provider.clone()) + .await + .unwrap() + { + log::info!("Indexer is still syncing ♻️ blocks left: {}", blocks_left); + continue; + } + } + let tasks: Vec<_> = monitoring_config .clone() .sources diff --git a/src/models.rs b/src/models.rs index 380ab3f..8b47bc9 100644 --- a/src/models.rs +++ b/src/models.rs @@ -14,10 +14,10 @@ pub struct SpotEntry { pub data_id: String, pub block_hash: String, pub block_number: i64, - pub block_timestamp: Option, + pub block_timestamp: NaiveDateTime, pub transaction_hash: String, pub price: BigDecimal, - pub timestamp: Option, + pub timestamp: chrono::NaiveDateTime, pub publisher: String, pub source: String, pub volume: BigDecimal, @@ -33,10 +33,10 @@ pub struct FutureEntry { pub data_id: String, pub block_hash: String, pub block_number: i64, - pub block_timestamp: Option, + pub block_timestamp: NaiveDateTime, pub transaction_hash: String, pub price: BigDecimal, - pub timestamp: Option, + pub timestamp: chrono::NaiveDateTime, pub publisher: String, pub source: String, pub volume: BigDecimal, diff --git a/src/monitoring/price_deviation.rs b/src/monitoring/price_deviation.rs index fc2c366..694baa0 100644 --- a/src/monitoring/price_deviation.rs +++ b/src/monitoring/price_deviation.rs @@ -39,7 +39,8 @@ pub async fn price_deviation( let coingecko_id = *ids.get(&pair_id).expect("Failed to get coingecko id"); let request_url = format!( - "https://coins.llama.fi/prices/current/coingecko:{id}", + "https://coins.llama.fi/prices/historical/{timestamp}/coingecko:{id}", + timestamp = query.timestamp.timestamp(), id = coingecko_id, ); @@ -52,8 +53,6 @@ pub async fn price_deviation( .await .map_err(|e| MonitoringError::Api(e.to_string()))?; - // TODO: Check returned timestamp - let api_id = format!("coingecko:{}", coingecko_id); let reference_price = coins_prices diff --git a/src/monitoring/source_deviation.rs b/src/monitoring/source_deviation.rs index 3b22e11..95718a1 100644 --- a/src/monitoring/source_deviation.rs +++ b/src/monitoring/source_deviation.rs @@ -1,7 +1,7 @@ use bigdecimal::ToPrimitive; use starknet::{ core::{ - types::{BlockId, BlockTag, FieldElement, FunctionCall}, + types::{BlockId, FieldElement, FunctionCall}, utils::cairo_short_string_to_felt, }, macros::selector, @@ -27,18 +27,27 @@ pub async fn source_deviation( entry_point_selector: selector!("get_data_median"), calldata: vec![FieldElement::ZERO, field_pair], }, - BlockId::Tag(BlockTag::Latest), + BlockId::Number(query.block_number.try_into().unwrap()), ) .await .map_err(|e| MonitoringError::OnChain(e.to_string()))?; - let decimals = config.decimals.get(&query.pair_id).unwrap(); + let decimals = config + .decimals + .get(&query.pair_id) + .ok_or(MonitoringError::OnChain(format!( + "Failed to get decimals for pair {:?}", + query.pair_id + )))?; + let on_chain_price = data .first() - .unwrap() + .ok_or(MonitoringError::OnChain("No data".to_string()))? .to_big_decimal(*decimals) .to_f64() - .unwrap(); + .ok_or(MonitoringError::Conversion( + "Failed to convert to f64".to_string(), + ))?; let deviation = (normalized_price - on_chain_price) / on_chain_price; let num_sources_aggregated = (*data.get(3).unwrap()).try_into().map_err(|e| { diff --git a/src/monitoring/time_since_last_update.rs b/src/monitoring/time_since_last_update.rs index e2ca64c..ae8d67b 100644 --- a/src/monitoring/time_since_last_update.rs +++ b/src/monitoring/time_since_last_update.rs @@ -4,8 +4,7 @@ use std::time::SystemTime; /// Calculate the time since the last update in seconds. pub fn time_since_last_update(query: &SpotEntry) -> u64 { - let datetime: DateTime = - TimeZone::from_utc_datetime(&Utc, &query.timestamp.expect("Failed to get timestamp")); + let datetime: DateTime = TimeZone::from_utc_datetime(&Utc, &query.timestamp); let timestamp = datetime.timestamp(); let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH); diff --git a/src/process_data.rs b/src/process_data.rs index 089bd8b..4c5695b 100644 --- a/src/process_data.rs +++ b/src/process_data.rs @@ -1,7 +1,15 @@ extern crate diesel; extern crate dotenv; +use std::sync::Arc; + use crate::config::Config; +use crate::constants::NUM_SOURCES; +use crate::constants::PAIR_PRICE; +use crate::constants::PRICE_DEVIATION; +use crate::constants::PRICE_DEVIATION_SOURCE; +use crate::constants::TIME_SINCE_LAST_UPDATE_PAIR_ID; +use crate::constants::TIME_SINCE_LAST_UPDATE_PUBLISHER; use crate::diesel::QueryDsl; use crate::error::MonitoringError; use crate::models::SpotEntry; @@ -15,73 +23,9 @@ use diesel::ExpressionMethods; use diesel_async::pooled_connection::AsyncDieselConnectionManager; use diesel_async::AsyncPgConnection; use diesel_async::RunQueryDsl; -use lazy_static::lazy_static; -use prometheus::register_int_gauge_vec; -use prometheus::IntGaugeVec; -use prometheus::{opts, register_gauge_vec, GaugeVec}; - -lazy_static! { - static ref TIME_SINCE_LAST_UPDATE_SOURCE: GaugeVec = register_gauge_vec!( - opts!( - "time_since_last_update_seconds", - "Time since the last update in seconds." - ), - &["source"] - ) - .unwrap(); -} - -lazy_static! { - static ref PAIR_PRICE: GaugeVec = register_gauge_vec!( - opts!("pair_price", "Price of the pair from the source."), - &["pair", "source"] - ) - .unwrap(); -} - -lazy_static! { - static ref TIME_SINCE_LAST_UPDATE_PAIR_ID: GaugeVec = register_gauge_vec!( - opts!( - "time_since_last_update_pair_id", - "Time since the last update in seconds." - ), - &["pair"] - ) - .unwrap(); -} - -lazy_static! { - static ref PRICE_DEVIATION: GaugeVec = register_gauge_vec!( - opts!( - "price_deviation", - "Price deviation from the reference price." - ), - &["pair", "source"] - ) - .unwrap(); -} - -lazy_static! { - static ref PRICE_DEVIATION_SOURCE: GaugeVec = register_gauge_vec!( - opts!( - "price_deviation_source", - "Price deviation from the reference price." - ), - &["source"] - ) - .unwrap(); -} - -lazy_static! { - static ref NUM_SOURCES: IntGaugeVec = register_int_gauge_vec!( - opts!( - "num_sources", - "Number of sources that have published data for a pair." - ), - &["pair"] - ) - .unwrap(); -} +use starknet::providers::jsonrpc::HttpTransport; +use starknet::providers::JsonRpcClient; +use starknet::providers::Provider; pub async fn process_data_by_pair( pool: deadpool::managed::Pool>, @@ -154,14 +98,16 @@ pub async fn process_data_by_pair_and_source( match filtered_by_source_result { Ok(data) => { - let time = time_since_last_update(&data); - - let time_labels = TIME_SINCE_LAST_UPDATE_SOURCE.with_label_values(&[src]); + // Get the labels + let time_labels = + TIME_SINCE_LAST_UPDATE_PUBLISHER.with_label_values(&[&data.publisher]); let price_labels = PAIR_PRICE.with_label_values(&[pair, src]); let deviation_labels = PRICE_DEVIATION.with_label_values(&[pair, src]); - let source_deviation_labels = PRICE_DEVIATION_SOURCE.with_label_values(&[src]); + let source_deviation_labels = PRICE_DEVIATION_SOURCE.with_label_values(&[pair, src]); let num_sources_labels = NUM_SOURCES.with_label_values(&[pair]); + // Compute metrics + let time = time_since_last_update(&data); let price_as_f64 = data.price.to_f64().ok_or(MonitoringError::Price( "Failed to convert price to f64".to_string(), ))?; @@ -171,6 +117,7 @@ pub async fn process_data_by_pair_and_source( let (source_deviation, num_sources_aggregated) = source_deviation(&data, normalized_price, config.clone()).await?; + // Set the metrics price_labels.set(normalized_price); time_labels.set(time as f64); deviation_labels.set(deviation); @@ -182,3 +129,36 @@ pub async fn process_data_by_pair_and_source( Err(e) => Err(e.into()), } } + +/// Checks if the indexer is still syncing. +/// Returns the number of blocks left to sync if it is still syncing. +pub async fn is_syncing( + pool: deadpool::managed::Pool>, + provider: Arc>, +) -> Result, MonitoringError> { + let mut conn = pool + .get() + .await + .map_err(|_| MonitoringError::Connection("Failed to get connection".to_string()))?; + + let latest_entry: Result = spot_entry + .order(block_timestamp.desc()) + .first(&mut conn) + .await; + + match latest_entry { + Ok(entry) => { + let block_n = entry.block_number as u64; + let current_block = provider + .block_number() + .await + .map_err(MonitoringError::Provider)?; + if block_n < current_block { + Ok(Some(current_block - block_n)) + } else { + Ok(None) + } + } + Err(_) => Ok(None), + } +} diff --git a/src/schema.rs b/src/schema.rs index 27c6713..98c0770 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -2,37 +2,132 @@ diesel::table! { future_entry (data_id) { + #[max_length = 255] network -> Varchar, + #[max_length = 255] pair_id -> Varchar, + #[max_length = 255] data_id -> Varchar, + #[max_length = 255] block_hash -> Varchar, - block_number -> BigInt, - block_timestamp -> Nullable, + block_number -> Int8, + block_timestamp -> Timestamp, + #[max_length = 255] transaction_hash -> Varchar, price -> Numeric, - timestamp -> Nullable, + timestamp -> Timestamp, + #[max_length = 255] publisher -> Varchar, + #[max_length = 255] source -> Varchar, volume -> Numeric, expiration_timestamp -> Nullable, - _cursor -> BigInt, + _cursor -> Int8, } } diesel::table! { - spot_entry (data_id) { + mainnet_future_entry (data_id) { + #[max_length = 255] network -> Varchar, + #[max_length = 255] pair_id -> Varchar, + #[max_length = 255] data_id -> Varchar, + #[max_length = 255] block_hash -> Varchar, - block_number -> BigInt, - block_timestamp -> Nullable, + block_number -> Int8, + block_timestamp -> Timestamp, + #[max_length = 255] transaction_hash -> Varchar, price -> Numeric, + timestamp -> Timestamp, + #[max_length = 255] + publisher -> Varchar, + #[max_length = 255] + source -> Varchar, + volume -> Numeric, + expiration_timestamp -> Timestamp, + _cursor -> Int8, + } +} + +diesel::table! { + mainnet_spot_entry (data_id) { + #[max_length = 255] + network -> Nullable, + #[max_length = 255] + pair_id -> Nullable, + #[max_length = 255] + data_id -> Varchar, + #[max_length = 255] + block_hash -> Nullable, + block_number -> Nullable, + block_timestamp -> Nullable, + #[max_length = 255] + transaction_hash -> Nullable, + price -> Nullable, timestamp -> Nullable, + #[max_length = 255] + publisher -> Nullable, + #[max_length = 255] + source -> Nullable, + volume -> Nullable, + _cursor -> Nullable, + } +} + +diesel::table! { + spot_entry (timestamp) { + #[max_length = 255] + network -> Varchar, + #[max_length = 255] + pair_id -> Varchar, + #[max_length = 255] + data_id -> Varchar, + #[max_length = 255] + block_hash -> Varchar, + block_number -> Int8, + block_timestamp -> Timestamp, + #[max_length = 255] + transaction_hash -> Varchar, + price -> Numeric, + timestamp -> Timestamp, + #[max_length = 255] publisher -> Varchar, + #[max_length = 255] source -> Varchar, volume -> Numeric, - _cursor -> BigInt, + _cursor -> Int8, } } + +diesel::table! { + vrf_requests (data_id) { + #[max_length = 255] + network -> Varchar, + request_id -> Numeric, + seed -> Numeric, + created_at -> Timestamp, + created_at_tx -> Varchar, + #[max_length = 255] + callback_address -> Varchar, + callback_fee_limit -> Numeric, + num_words -> Numeric, + requestor_address -> Varchar, + updated_at -> Timestamp, + updated_at_tx -> Varchar, + status -> Numeric, + minimum_block_number -> Numeric, + _cursor -> Int8range, + data_id -> Varchar, + } +} + +diesel::allow_tables_to_appear_in_same_query!( + future_entry, + mainnet_future_entry, + mainnet_spot_entry, + spot_entry, + vrf_requests, +); diff --git a/src/tests/common/constants.rs b/src/tests/common/constants.rs new file mode 100644 index 0000000..f13d286 --- /dev/null +++ b/src/tests/common/constants.rs @@ -0,0 +1,2 @@ +pub const PUBLISHER_ADDRESS: &str = + "0x0624EBFB99865079BD58CFCFB925B6F5CE940D6F6E41E118B8A72B7163FB435C"; diff --git a/src/tests/common/fixtures.rs b/src/tests/common/fixtures.rs new file mode 100644 index 0000000..40c60ad --- /dev/null +++ b/src/tests/common/fixtures.rs @@ -0,0 +1,30 @@ +use deadpool::managed::Pool; +use diesel_async::pooled_connection::AsyncDieselConnectionManager; +use rstest::fixture; +use starknet::core::types::FieldElement; + +use crate::config::{Config, NetworkName}; + +#[fixture] +pub fn database() -> Pool> { + // Setup database connection + let database_url = "postgres://postgres:postgres@localhost:5432/postgres"; + let config = AsyncDieselConnectionManager::::new(database_url); + let pool: Pool> = + Pool::builder(config).build().unwrap(); + + pool +} + +#[fixture] +pub async fn test_config() -> Config { + Config::new( + NetworkName::Katana, + FieldElement::from_hex_be( + "0x06df335982dddce41008e4c03f2546fa27276567b5274c7d0c1262f3c2b5d167", + ) + .unwrap(), + vec!["ETH/USD".to_string(), "BTC/USD".to_string()], + ) + .await +} diff --git a/src/tests/common/mod.rs b/src/tests/common/mod.rs new file mode 100644 index 0000000..bd7b06f --- /dev/null +++ b/src/tests/common/mod.rs @@ -0,0 +1,3 @@ +pub mod constants; +pub mod fixtures; +pub mod utils; diff --git a/src/tests/common/utils.rs b/src/tests/common/utils.rs new file mode 100644 index 0000000..908707b --- /dev/null +++ b/src/tests/common/utils.rs @@ -0,0 +1,100 @@ +use std::time::Duration; + +use starknet::{ + accounts::{single_owner::SignError, Account, AccountError, Call, SingleOwnerAccount}, + core::{ + chain_id, + types::{FieldElement, InvokeTransactionResult}, + utils::{cairo_short_string_to_felt, get_selector_from_name}, + }, + providers::{jsonrpc::HttpTransport, JsonRpcClient}, + signers::local_wallet::SignError as SigningError, + signers::{LocalWallet, SigningKey}, +}; + +use super::constants::PUBLISHER_ADDRESS; + +/// Wait for a condition to be true, with a timeout. +pub async fn wait_for_expect( + mut condition: F, + timeout: Duration, + interval: Duration, +) -> Option +where + F: FnMut() -> Option, +{ + let start = tokio::time::Instant::now(); + while tokio::time::Instant::now() - start < timeout { + if let Some(result) = condition() { + return Some(result); + } + tokio::time::sleep(interval).await; + } + None +} + +type RpcAccount<'a> = SingleOwnerAccount<&'a JsonRpcClient, LocalWallet>; + +pub fn build_single_owner_account<'a>( + rpc: &'a JsonRpcClient, + private_key: &str, + account_address: &str, + is_legacy: bool, +) -> RpcAccount<'a> { + let signer = LocalWallet::from(SigningKey::from_secret_scalar( + FieldElement::from_hex_be(private_key).unwrap(), + )); + let account_address = + FieldElement::from_hex_be(account_address).expect("Invalid Contract Address"); + let execution_encoding = if is_legacy { + starknet::accounts::ExecutionEncoding::Legacy + } else { + starknet::accounts::ExecutionEncoding::New + }; + SingleOwnerAccount::new( + rpc, + signer, + account_address, + chain_id::TESTNET, + execution_encoding, + ) +} + +pub async fn publish_data( + provider: &JsonRpcClient, + oracle_address: FieldElement, + pair_id: &str, + timestamp: &str, + price: &str, + source: &str, + publisher: &str, +) -> Result>> { + let publisher_account = build_single_owner_account( + provider, + &std::env::var("SIGNER_PRIVATE").expect("SIGNER_PRIVATE env var not set"), + PUBLISHER_ADDRESS, + false, + ); + + let pair_id = cairo_short_string_to_felt(pair_id).expect("Invalid pair id"); + let timestamp = FieldElement::from_dec_str(timestamp).expect("Invalid timestamp"); + let price = FieldElement::from_dec_str(price).expect("Invalid price"); + let source = cairo_short_string_to_felt(source).expect("Invalid source"); + let publisher = cairo_short_string_to_felt(publisher).expect("Invalid publisher"); + + let calls = vec![Call { + to: oracle_address, + selector: get_selector_from_name("publish_data").unwrap(), + calldata: vec![ + FieldElement::ZERO, + timestamp, + source, + publisher, + price, + pair_id, + FieldElement::ZERO, + ], + }]; + let tx = publisher_account.execute(calls); + tx.send().await +} diff --git a/src/tests/mod.rs b/src/tests/mod.rs new file mode 100644 index 0000000..bf7f563 --- /dev/null +++ b/src/tests/mod.rs @@ -0,0 +1,4 @@ +mod common; + +#[cfg(test)] +mod monitoring; diff --git a/src/tests/monitoring.rs b/src/tests/monitoring.rs new file mode 100644 index 0000000..d910260 --- /dev/null +++ b/src/tests/monitoring.rs @@ -0,0 +1,88 @@ +use std::sync::Arc; + +use crate::{ + config::Config, + monitor, + tests::common::{ + fixtures::{database, test_config}, + utils::{publish_data, wait_for_expect}, + }, +}; +use deadpool::managed::Pool; +use diesel_async::pooled_connection::AsyncDieselConnectionManager; +use rstest::rstest; +use tokio::sync::Mutex; + +#[rstest] +#[tokio::test] +#[ignore = "Blocked by #002"] +async fn detects_publisher_down( + database: Pool>, + #[future] test_config: Config, +) { + let mut _conn = database.get().await.unwrap(); + let config = test_config.await; + + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5)); + let database = Arc::new(Mutex::new(database)); + let db_clone = database.clone(); + let config_clone = config.clone(); + let monitor_handle = tokio::spawn(async move { + let db = db_clone.lock().await; + monitor(&db, config_clone, &mut interval, false).await; + }); + + // Publish a wrong price + let provider = config.network.provider; + + // Publish 0 for the price of BTC/USD pair + let pair_id = "BTC/USD"; + let timestamp = &chrono::Utc::now().timestamp().to_string(); + let price = "0"; + let source = "BITSTAMP"; + let publisher = "PRAGMA"; + + publish_data( + &provider, + config.network.oracle_address, + pair_id, + timestamp, + price, + source, + publisher, + ) + .await + .unwrap(); + + // Check that the metrics are updated + let res = wait_for_expect( + || { + // Gather the metrics. + let metrics = prometheus::gather(); + println!("Metrics: {:?}", metrics); + + let price_deviation = metrics.iter().find(|m| m.get_name() == "price_deviation"); + + if let Some(price_deviation) = price_deviation { + let metrics = price_deviation.get_metric(); + let btc_deviation = metrics.iter().find(|m| { + m.get_label() + .iter() + .any(|x| x.get_name() == "pair" && x.get_value() == "BTC/USD") + }); + println!("BTC deviation metric: {:?}", btc_deviation); + return Some(()); + } + + None + }, + tokio::time::Duration::from_secs(60), + tokio::time::Duration::from_secs(5), + ) + .await; + + assert!(res.is_some()); + + // Clean up + monitor_handle.abort(); +}