From 2ce1d288e18a740f273e8146eb81f8748e0e8851 Mon Sep 17 00:00:00 2001 From: azurwastaken Date: Tue, 8 Oct 2024 11:36:41 +0200 Subject: [PATCH 1/8] yes --- src/config.rs | 3 +++ src/main.rs | 54 ++++++++++++++++++++++++++++++++++++++++ src/processing/future.rs | 3 +++ src/processing/spot.rs | 38 ++++++++++++++++++++++++++++ 4 files changed, 98 insertions(+) diff --git a/src/config.rs b/src/config.rs index 32e518b..14b8c6a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -31,6 +31,8 @@ pub enum NetworkName { Mainnet, #[strum(ascii_case_insensitive)] Testnet, + #[strum(ascii_case_insensitive, serialize = "pragma-devnet")] + PragmaDevnet, } #[derive(Debug, EnumString, IntoStaticStr, PartialEq, Eq, Hash, Clone, Display)] @@ -162,6 +164,7 @@ impl Config { match self.network.name { NetworkName::Mainnet => format!("mainnet_{}", table_name), NetworkName::Testnet => table_name.to_string(), + NetworkName::PragmaDevnet => table_name.to_string(), } } diff --git a/src/main.rs b/src/main.rs index 1449b79..e508963 100644 --- a/src/main.rs +++ b/src/main.rs @@ -59,6 +59,7 @@ async fn main() { // Monitor spot/future in parallel let spot_monitoring = tokio::spawn(onchain_monitor(pool.clone(), true, &DataType::Spot)); let future_monitoring = tokio::spawn(onchain_monitor(pool.clone(), true, &DataType::Future)); + let pragmagix_monitoring = tokio::spawn(pragmagix_monitor(pool.clone(), true, &DataType::Spot)); let publisher_monitoring = tokio::spawn(publisher_monitor(pool.clone(), false)); let api_monitoring = tokio::spawn(api_monitor()); let vrf_monitoring = tokio::spawn(vrf_monitor(pool.clone())); @@ -273,3 +274,56 @@ pub(crate) async fn vrf_monitor(pool: Pool>, + wait_for_syncing: bool, + data_type: &DataType, +) { + let monitoring_config = get_config(None).await; + + let mut interval = interval(Duration::from_secs(30)); + + loop { + interval.tick().await; // Wait for the next tick + + // Skip if indexer is still syncing + if wait_for_syncing && !indexers_are_synced(data_type).await { + continue; + } + + let tasks: Vec<_> = monitoring_config + .sources(data_type.clone()) + .iter() + .flat_map(|(pair, sources)| { + if is_long_tail_asset(pair) { + vec![tokio::spawn(Box::pin( + processing::spot::process_long_tail_asset( + pool.clone(), + pair.clone(), + sources.to_vec(), + ), + ))] + } else { + vec![ + tokio::spawn(Box::pin(processing::spot::process_data_by_pair( + pool.clone(), + pair.clone(), + ))), + tokio::spawn(Box::pin( + processing::spot::process_data_by_pair_and_sources( + pool.clone(), + pair.clone(), + sources.to_vec(), + ), + )), + ] + } + }) + .collect(); + + let results: Vec<_> = futures::future::join_all(tasks).await; + log_tasks_results(data_type.into(), results); + } +} \ No newline at end of file diff --git a/src/processing/future.rs b/src/processing/future.rs index 9785e08..4ce619e 100644 --- a/src/processing/future.rs +++ b/src/processing/future.rs @@ -50,6 +50,7 @@ pub async fn process_data_by_pair( .first(&mut conn) .await? } + NetworkName::PragmaDevnet => unreachable!(), }; log::info!("Processing data for pair: {}", pair); @@ -129,6 +130,7 @@ pub async fn process_data_by_pair_and_source( .first(&mut conn) .await? } + NetworkName::PragmaDevnet => unreachable!(), }; let network_env = &config.network_str(); @@ -181,6 +183,7 @@ pub async fn process_data_by_publisher( .first(&mut conn) .await? } + NetworkName::PragmaDevnet => unreachable!(), }; log::info!("Processing data for publisher: {}", publisher); diff --git a/src/processing/spot.rs b/src/processing/spot.rs index e6cc7ac..2d3f0e6 100644 --- a/src/processing/spot.rs +++ b/src/processing/spot.rs @@ -40,6 +40,7 @@ pub async fn process_data_by_pair( let data: SpotEntry = match config.network().name { NetworkName::Testnet => { testnet_dsl::spot_entry + .filter(testnet_dsl::network.eq("testnet")) .filter(testnet_dsl::pair_id.eq(pair.clone())) .order(testnet_dsl::block_timestamp.desc()) .first(&mut conn) @@ -52,6 +53,14 @@ pub async fn process_data_by_pair( .first(&mut conn) .await? } + NetworkName::PragmaDevnet => { + testnet_dsl::spot_entry + .filter(testnet_dsl::network.eq("pragma-devnet")) + .filter(testnet_dsl::pair_id.eq(pair.clone())) + .order(testnet_dsl::block_timestamp.desc()) + .first(&mut conn) + .await? + }, }; log::info!("Processing data for pair: {}", pair); @@ -112,6 +121,16 @@ pub async fn process_data_by_pair_and_source( let data: SpotEntry = match config.network().name { NetworkName::Testnet => { testnet_dsl::spot_entry + .filter(testnet_dsl::network.eq("testnet")) + .filter(testnet_dsl::pair_id.eq(pair)) + .filter(testnet_dsl::source.eq(src)) + .order(testnet_dsl::block_timestamp.desc()) + .first(&mut conn) + .await? + } + NetworkName::PragmaDevnet => { + testnet_dsl::spot_entry + .filter(testnet_dsl::network.eq("pragma-devnet")) .filter(testnet_dsl::pair_id.eq(pair)) .filter(testnet_dsl::source.eq(src)) .order(testnet_dsl::block_timestamp.desc()) @@ -166,6 +185,15 @@ pub async fn process_data_by_publisher( let data: SpotEntry = match config.network().name { NetworkName::Testnet => { testnet_dsl::spot_entry + .filter(testnet_dsl::network.eq("testnet")) + .filter(testnet_dsl::publisher.eq(publisher.clone())) + .order(testnet_dsl::block_timestamp.desc()) + .first(&mut conn) + .await? + } + NetworkName::PragmaDevnet => { + testnet_dsl::spot_entry + .filter(testnet_dsl::network.eq("pragma-devnet")) .filter(testnet_dsl::publisher.eq(publisher.clone())) .order(testnet_dsl::block_timestamp.desc()) .first(&mut conn) @@ -242,6 +270,16 @@ pub async fn get_price_deviation_for_source_from_chain( let data: SpotEntry = match config.network().name { NetworkName::Testnet => { testnet_dsl::spot_entry + .filter(testnet_dsl::network.eq("testnet")) + .filter(testnet_dsl::pair_id.eq(pair)) + .filter(testnet_dsl::source.eq(source)) + .order(testnet_dsl::block_timestamp.desc()) + .first(&mut conn) + .await? + } + NetworkName::PragmaDevnet => { + testnet_dsl::spot_entry + .filter(testnet_dsl::network.eq("pragma-devnet")) .filter(testnet_dsl::pair_id.eq(pair)) .filter(testnet_dsl::source.eq(source)) .order(testnet_dsl::block_timestamp.desc()) From b275ea8951ce725a3670866c91162b765b020c7f Mon Sep 17 00:00:00 2001 From: akhercha Date: Wed, 9 Oct 2024 16:29:55 +0200 Subject: [PATCH 2/8] =?UTF-8?q?feat(PragmaGix-monitoring):=20Dispatch=20mo?= =?UTF-8?q?nitor=20=F0=9F=AB=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 5 ++ src/config.rs | 7 +- src/constants.rs | 21 ++++++ src/main.rs | 148 +++++++++++++++++-------------------- src/models.rs | 53 ++++++++----- src/processing/dispatch.rs | 53 +++++++++++++ src/processing/mod.rs | 1 + src/processing/spot.rs | 2 +- src/schema.rs | 96 +++++++++++++++++++++++- src/utils.rs | 13 +++- 10 files changed, 295 insertions(+), 104 deletions(-) create mode 100644 src/processing/dispatch.rs diff --git a/README.md b/README.md index 06bd558..53a347c 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,11 @@ It then processes the data and computes the following metrics: - `vrf_time_since_last_handle_request{network}`: Time since the last VRF request was handled for a given network. - `vrf_time_since_oldest_request_in_pending_status{network}`: Time in seconds that the oldest pending VRF request has been in the pending status for a given network. +Metrics specifics to our Pragma App Chain: +- `dispatch_event_latest_block`: The latest block that triggered a Dispatch event from Hyperlane, +- `dispatch_event_feed_latest_block_update`: The latest block that triggered a Dispatch event from Hyperlane for a specific Feed ID, +- `dispatch_event_nb_feeds_updated`: The number of feeds updated per Dispatch event at a given block. + ## Shared Public Access Monitoring is not publicicly available yet but databases will soon be in read-only mode. diff --git a/src/config.rs b/src/config.rs index 14b8c6a..5b222be 100644 --- a/src/config.rs +++ b/src/config.rs @@ -164,13 +164,18 @@ impl Config { match self.network.name { NetworkName::Mainnet => format!("mainnet_{}", table_name), NetworkName::Testnet => table_name.to_string(), - NetworkName::PragmaDevnet => table_name.to_string(), + NetworkName::PragmaDevnet => format!("pragma_devnet_{}", table_name), } } pub fn all_publishers(&self) -> &HashMap { &self.publishers } + + /// Check if the configuration is set for a Pragma Chain + pub fn is_pragma_chain(&self) -> bool { + matches!(self.network.name, NetworkName::PragmaDevnet) + } } #[derive(Debug, Clone)] diff --git a/src/constants.rs b/src/constants.rs index e3216ee..de3cade 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -207,6 +207,27 @@ lazy_static! { &["network"] ) .unwrap(); + pub static ref DISPATCH_EVENT_LATEST_BLOCK: GaugeVec = register_gauge_vec!( + opts!( + "dispatch_event_latest_block", + "The latest block that triggered a Dispatch event from Hyperlane" + ), + &["network"] + ).unwrap(); + pub static ref DISPATCH_EVENT_FEED_LATEST_BLOCK_UPDATE: GaugeVec = register_gauge_vec!( + opts!( + "dispatch_event_feed_latest_block_update", + "The latest block that triggered a Dispatch event from Hyperlane for a specific Feed ID" + ), + &["network", "feed_id"] + ).unwrap(); + pub static ref DISPATCH_EVENT_NB_FEEDS_UPDATED: GaugeVec = register_gauge_vec!( + opts!( + "dispatch_event_nb_feeds_updated", + "The number of feeds updated per Dispatch event at a given block" + ), + &["network", "block"] + ).unwrap(); } #[allow(unused)] diff --git a/src/main.rs b/src/main.rs index e508963..ba444c8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,6 +25,7 @@ mod utils; #[cfg(test)] mod tests; +use std::collections::HashMap; use std::time::Duration; use std::{env, vec}; @@ -32,11 +33,17 @@ use deadpool::managed::Pool; use diesel_async::pooled_connection::AsyncDieselConnectionManager; use diesel_async::AsyncPgConnection; use dotenv::dotenv; +use tokio::task::JoinHandle; use tokio::time::interval; use config::{get_config, init_long_tail_asset_configuration, periodic_config_update, DataType}; use processing::common::{check_publisher_balance, indexers_are_synced}; -use utils::{is_long_tail_asset, log_tasks_results}; +use utils::{is_long_tail_asset, log_monitoring_results, log_tasks_results}; + +struct MonitoringTask { + name: String, + handle: JoinHandle<()>, +} #[tokio::main] async fn main() { @@ -56,48 +63,61 @@ async fn main() { // Set the long tail asset list init_long_tail_asset_configuration(); + // Monitor spot/future in parallel - let spot_monitoring = tokio::spawn(onchain_monitor(pool.clone(), true, &DataType::Spot)); - let future_monitoring = tokio::spawn(onchain_monitor(pool.clone(), true, &DataType::Future)); - let pragmagix_monitoring = tokio::spawn(pragmagix_monitor(pool.clone(), true, &DataType::Spot)); - let publisher_monitoring = tokio::spawn(publisher_monitor(pool.clone(), false)); - let api_monitoring = tokio::spawn(api_monitor()); - let vrf_monitoring = tokio::spawn(vrf_monitor(pool.clone())); - - let config_update = tokio::spawn(periodic_config_update()); - - // Wait for the monitoring to finish - let results = futures::future::join_all(vec![ - spot_monitoring, - future_monitoring, - api_monitoring, - publisher_monitoring, - vrf_monitoring, - config_update, - ]) - .await; - - // Check if any of the monitoring tasks failed - if let Err(e) = &results[0] { - log::error!("[SPOT] Monitoring failed: {:?}", e); - } - if let Err(e) = &results[1] { - log::error!("[FUTURE] Monitoring failed: {:?}", e); - } - if let Err(e) = &results[2] { - log::error!("[API] Monitoring failed: {:?}", e); - } - if let Err(e) = &results[3] { - log::error!("[PUBLISHERS] Monitoring failed: {:?}", e); - } + let monitoring_tasks = spawn_monitoring_tasks(pool.clone(), &monitoring_config).await; + handle_task_results(monitoring_tasks).await; +} - if let Err(e) = &results[4] { - log::error!("[VRF] Monitoring failed: {:?}", e); +async fn spawn_monitoring_tasks( + pool: Pool>, + monitoring_config: &config::Config, +) -> Vec { + let mut tasks = vec![ + MonitoringTask { + name: "Config Update".to_string(), + handle: tokio::spawn(periodic_config_update()), + }, + MonitoringTask { + name: "Spot Monitoring".to_string(), + handle: tokio::spawn(onchain_monitor(pool.clone(), true, &DataType::Spot)), + }, + MonitoringTask { + name: "Future Monitoring".to_string(), + handle: tokio::spawn(onchain_monitor(pool.clone(), true, &DataType::Future)), + }, + MonitoringTask { + name: "Publisher Monitoring".to_string(), + handle: tokio::spawn(publisher_monitor(pool.clone(), false)), + }, + ]; + + if monitoring_config.is_pragma_chain() { + tasks.push(MonitoringTask { + name: "Hyperlane Dispatches Monitoring".to_string(), + handle: tokio::spawn(hyperlane_dispatch_monitor(pool.clone(), true)), + }); + } else { + tasks.push(MonitoringTask { + name: "API Monitoring".to_string(), + handle: tokio::spawn(api_monitor()), + }); + tasks.push(MonitoringTask { + name: "VRF Monitoring".to_string(), + handle: tokio::spawn(vrf_monitor(pool.clone())), + }); } - if let Err(e) = &results[5] { - log::error!("[CONFIG] Config Update failed: {:?}", e); + tasks +} + +async fn handle_task_results(tasks: Vec) { + let mut results = HashMap::new(); + for task in tasks { + let result = task.handle.await; + results.insert(task.name, result); } + log_monitoring_results(results); } pub(crate) async fn api_monitor() { @@ -275,55 +295,25 @@ pub(crate) async fn vrf_monitor(pool: Pool>, wait_for_syncing: bool, - data_type: &DataType, ) { - let monitoring_config = get_config(None).await; - let mut interval = interval(Duration::from_secs(30)); loop { - interval.tick().await; // Wait for the next tick - // Skip if indexer is still syncing - if wait_for_syncing && !indexers_are_synced(data_type).await { - continue; + if wait_for_syncing { + // TODO: Adapt this for dispatch events + log::info!("Not implemented yet. TODO"); } - let tasks: Vec<_> = monitoring_config - .sources(data_type.clone()) - .iter() - .flat_map(|(pair, sources)| { - if is_long_tail_asset(pair) { - vec![tokio::spawn(Box::pin( - processing::spot::process_long_tail_asset( - pool.clone(), - pair.clone(), - sources.to_vec(), - ), - ))] - } else { - vec![ - tokio::spawn(Box::pin(processing::spot::process_data_by_pair( - pool.clone(), - pair.clone(), - ))), - tokio::spawn(Box::pin( - processing::spot::process_data_by_pair_and_sources( - pool.clone(), - pair.clone(), - sources.to_vec(), - ), - )), - ] - } - }) - .collect(); - + let tasks: Vec<_> = vec![tokio::spawn(Box::pin( + processing::dispatch::process_dispatch_events(pool.clone()), + ))]; let results: Vec<_> = futures::future::join_all(tasks).await; - log_tasks_results(data_type.into(), results); + log_tasks_results("Dispatch", results); + + interval.tick().await; // Wait for the next tick } -} \ No newline at end of file +} diff --git a/src/models.rs b/src/models.rs index 87cc618..3f57891 100644 --- a/src/models.rs +++ b/src/models.rs @@ -90,32 +90,45 @@ pub struct VrfRequest { pub data_id: String, } - #[derive(Queryable, Debug, QueryableByName, Selectable)] #[diesel(primary_key(data_id))] #[diesel(check_for_backend(diesel::pg::Pg))] #[diesel(table_name = crate::schema::oo_requests)] pub struct OORequest { - pub network: String, - pub data_id: String, - pub assertion_id: String, - pub domain_id: String, - pub claim: String, - pub asserter: String, - pub disputer: Option, - pub disputed: Option, + pub network: String, + pub data_id: String, + pub assertion_id: String, + pub domain_id: String, + pub claim: String, + pub asserter: String, + pub disputer: Option, + pub disputed: Option, pub dispute_id: Option, - pub callback_recipient: String, - pub escalation_manager: String, - pub caller: String, - pub expiration_timestamp: NaiveDateTime, + pub callback_recipient: String, + pub escalation_manager: String, + pub caller: String, + pub expiration_timestamp: NaiveDateTime, pub settled: Option, - pub settlement_resolution: Option, - pub settle_caller: Option, - pub currency: String, - pub bond: BigDecimal, - pub _cursor: (Bound, Bound), - pub identifier: String, + pub settlement_resolution: Option, + pub settle_caller: Option, + pub currency: String, + pub bond: BigDecimal, + pub _cursor: (Bound, Bound), + pub identifier: String, pub updated_at: NaiveDateTime, pub updated_at_tx: String, -} \ No newline at end of file +} + +#[derive(Debug, Queryable, Selectable, QueryableByName)] +#[diesel(table_name = crate::schema::pragma_devnet_dispatch_event)] +#[diesel(check_for_backend(diesel::pg::Pg))] +pub struct PragmaDevnetDispatchEvent { + pub network: String, + pub block_hash: String, + pub block_number: i64, + pub block_timestamp: NaiveDateTime, + pub transaction_hash: String, + pub hyperlane_message_nonce: BigDecimal, + pub feeds_updated: Option>, + pub _cursor: (Bound, Bound), +} diff --git a/src/processing/dispatch.rs b/src/processing/dispatch.rs new file mode 100644 index 0000000..cd3e85a --- /dev/null +++ b/src/processing/dispatch.rs @@ -0,0 +1,53 @@ +use deadpool::managed::Pool; +use diesel::prelude::*; +use diesel_async::pooled_connection::AsyncDieselConnectionManager; +use diesel_async::{AsyncPgConnection, RunQueryDsl}; + +use crate::constants::{ + DISPATCH_EVENT_FEED_LATEST_BLOCK_UPDATE, DISPATCH_EVENT_LATEST_BLOCK, + DISPATCH_EVENT_NB_FEEDS_UPDATED, +}; +use crate::schema::pragma_devnet_dispatch_event::dsl as dispatch_dsl; +use crate::{config::get_config, error::MonitoringError, models::PragmaDevnetDispatchEvent}; + +/// Read the database of the indexed Dispatch events and populate the metrics: +/// * dispatch_event_latest_block, +/// * dispatch_event_feed_latest_block_update, +/// * dispatch_event_nb_feeds_updated. +pub async fn process_dispatch_events( + pool: Pool>, +) -> Result<(), MonitoringError> { + let config = get_config(None).await; + let network = config.network_str(); + + let mut conn = pool.get().await.map_err(MonitoringError::Connection)?; + + // Query the latest dispatch event + let latest_event = dispatch_dsl::pragma_devnet_dispatch_event + .filter(dispatch_dsl::network.eq(network)) + .order(dispatch_dsl::block_number.desc()) + .first::(&mut conn) + .await + .optional()?; + + if let Some(event) = latest_event { + DISPATCH_EVENT_LATEST_BLOCK + .with_label_values(&[network]) + .set(event.block_number as f64); + + if let Some(feeds_updated) = event.feeds_updated { + let nb_feeds_updated = feeds_updated.len() as f64; + DISPATCH_EVENT_NB_FEEDS_UPDATED + .with_label_values(&[network, &event.block_number.to_string()]) + .set(nb_feeds_updated); + + for feed in feeds_updated { + DISPATCH_EVENT_FEED_LATEST_BLOCK_UPDATE + .with_label_values(&[network, &feed]) + .set(event.block_number as f64); + } + } + } + + Ok(()) +} diff --git a/src/processing/mod.rs b/src/processing/mod.rs index 674ec7e..e14e64a 100644 --- a/src/processing/mod.rs +++ b/src/processing/mod.rs @@ -1,5 +1,6 @@ pub mod api; pub mod common; +pub mod dispatch; pub mod future; pub mod spot; pub mod vrf; diff --git a/src/processing/spot.rs b/src/processing/spot.rs index 2d3f0e6..3e0fef8 100644 --- a/src/processing/spot.rs +++ b/src/processing/spot.rs @@ -60,7 +60,7 @@ pub async fn process_data_by_pair( .order(testnet_dsl::block_timestamp.desc()) .first(&mut conn) .await? - }, + } }; log::info!("Processing data for pair: {}", pair); diff --git a/src/schema.rs b/src/schema.rs index 8395c86..5f7e695 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -25,6 +25,32 @@ diesel::table! { } } +diesel::table! { + pragma_devnet_future_entry (data_id) { + #[max_length = 255] + network -> Varchar, + #[max_length = 255] + pair_id -> Varchar, + #[max_length = 255] + data_id -> Varchar, + #[max_length = 255] + block_hash -> Varchar, + block_number -> Int8, + block_timestamp -> Timestamp, + #[max_length = 255] + transaction_hash -> Varchar, + price -> Numeric, + timestamp -> Timestamp, + #[max_length = 255] + publisher -> Varchar, + #[max_length = 255] + source -> Varchar, + volume -> Numeric, + expiration_timestamp -> Nullable, + _cursor -> Int8, + } +} + diesel::table! { mainnet_future_entry (data_id) { #[max_length = 255] @@ -101,6 +127,31 @@ diesel::table! { } } +diesel::table! { + pragma_devnet_spot_entry (data_id) { + #[max_length = 255] + network -> Varchar, + #[max_length = 255] + pair_id -> Varchar, + #[max_length = 255] + data_id -> Varchar, + #[max_length = 255] + block_hash -> Varchar, + block_number -> Int8, + block_timestamp -> Timestamp, + #[max_length = 255] + transaction_hash -> Varchar, + price -> Numeric, + timestamp -> Timestamp, + #[max_length = 255] + publisher -> Varchar, + #[max_length = 255] + source -> Varchar, + volume -> Numeric, + _cursor -> Int8, + } +} + diesel::table! { mainnet_spot_checkpoints (pair_id) { #[max_length = 255] @@ -149,6 +200,30 @@ diesel::table! { } } +diesel::table! { + pragma_devnet_spot_checkpoints (data_id) { + #[max_length = 255] + network -> Varchar, + #[max_length = 255] + pair_id -> Varchar, + #[max_length = 255] + data_id -> Varchar, + #[max_length = 255] + block_hash -> Varchar, + block_number -> Int8, + block_timestamp -> Timestamp, + #[max_length = 255] + transaction_hash -> Varchar, + price -> Numeric, + #[max_length = 255] + sender_address -> Varchar, + aggregation_mode -> Numeric, + _cursor -> Int8, + timestamp -> Timestamp, + nb_sources_aggregated -> Numeric, + } +} + diesel::table! { vrf_requests (data_id) { #[max_length = 255] @@ -171,7 +246,6 @@ diesel::table! { } } - diesel::table! { oo_requests (data_id) { #[max_length = 255] @@ -209,13 +283,31 @@ diesel::table! { } } +diesel::table! { + pragma_devnet_dispatch_event (block_number) { + #[max_length = 255] + network -> Varchar, + #[max_length = 255] + block_hash -> Varchar, + block_number -> Int8, + block_timestamp -> Timestamp, + #[max_length = 255] + transaction_hash -> Varchar, + hyperlane_message_nonce -> Numeric, + feeds_updated -> Nullable>, + _cursor -> Int8range, + } +} diesel::allow_tables_to_appear_in_same_query!( - future_entry, mainnet_future_entry, mainnet_spot_checkpoints, mainnet_spot_entry, + future_entry, spot_checkpoints, spot_entry, + pragma_devnet_future_entry, + pragma_devnet_spot_entry, + pragma_devnet_spot_checkpoints, vrf_requests, ); diff --git a/src/utils.rs b/src/utils.rs index 70d0d1d..2f0225f 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,4 +1,4 @@ -use std::fmt::Display; +use std::{collections::HashMap, fmt::Display}; use num_bigint::BigUint; use starknet::core::types::Felt; @@ -41,6 +41,17 @@ pub(crate) fn log_tasks_results( } } +/// Process or output the results of tokio monitoring tasks +#[allow(dead_code)] +pub(crate) fn log_monitoring_results(results: HashMap>) { + for (task_name, result) in results { + match result { + Ok(_) => log::info!("[{}] Monitoring completed successfully", task_name), + Err(e) => log::error!("[{}] Monitoring failed: {:?}", task_name, e), + } + } +} + /// Check if the provided pair in a long tail asset. #[allow(dead_code)] pub(crate) fn is_long_tail_asset(pair: &str) -> bool { From d5d0e8a1f5ab83ba1ebc7bb699e6b861c138442e Mon Sep 17 00:00:00 2001 From: akhercha Date: Wed, 9 Oct 2024 16:32:41 +0200 Subject: [PATCH 3/8] feat(PragmaGix-monitoring): Linters :) --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 53a347c..b994683 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ It then processes the data and computes the following metrics: - `vrf_time_since_oldest_request_in_pending_status{network}`: Time in seconds that the oldest pending VRF request has been in the pending status for a given network. Metrics specifics to our Pragma App Chain: + - `dispatch_event_latest_block`: The latest block that triggered a Dispatch event from Hyperlane, - `dispatch_event_feed_latest_block_update`: The latest block that triggered a Dispatch event from Hyperlane for a specific Feed ID, - `dispatch_event_nb_feeds_updated`: The number of feeds updated per Dispatch event at a given block. From 7493a4ebf32c1b3205f28c1bd3df8f86b9c97b60 Mon Sep 17 00:00:00 2001 From: akhercha Date: Wed, 9 Oct 2024 16:43:39 +0200 Subject: [PATCH 4/8] feat(PragmaGix-monitoring): Check sync status for Dispatch events --- src/main.rs | 13 +++++------ src/processing/common.rs | 50 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 53 insertions(+), 10 deletions(-) diff --git a/src/main.rs b/src/main.rs index ba444c8..bdcde99 100644 --- a/src/main.rs +++ b/src/main.rs @@ -37,7 +37,7 @@ use tokio::task::JoinHandle; use tokio::time::interval; use config::{get_config, init_long_tail_asset_configuration, periodic_config_update, DataType}; -use processing::common::{check_publisher_balance, indexers_are_synced}; +use processing::common::{check_publisher_balance, data_indexers_are_synced, indexers_are_synced}; use utils::{is_long_tail_asset, log_monitoring_results, log_tasks_results}; struct MonitoringTask { @@ -167,7 +167,7 @@ pub(crate) async fn onchain_monitor( interval.tick().await; // Wait for the next tick // Skip if indexer is still syncing - if wait_for_syncing && !indexers_are_synced(data_type).await { + if wait_for_syncing && !data_indexers_are_synced(data_type).await { continue; } @@ -237,7 +237,7 @@ pub(crate) async fn publisher_monitor( interval.tick().await; // Wait for the next tick // Skip if indexer is still syncing - if wait_for_syncing && !indexers_are_synced(&DataType::Spot).await { + if wait_for_syncing && !data_indexers_are_synced(&DataType::Spot).await { continue; } @@ -299,13 +299,12 @@ pub(crate) async fn hyperlane_dispatch_monitor( pool: Pool>, wait_for_syncing: bool, ) { - let mut interval = interval(Duration::from_secs(30)); + let mut interval = interval(Duration::from_secs(5)); loop { // Skip if indexer is still syncing - if wait_for_syncing { - // TODO: Adapt this for dispatch events - log::info!("Not implemented yet. TODO"); + if wait_for_syncing && !indexers_are_synced("pragma_devnet_dispatch_event").await { + continue; } let tasks: Vec<_> = vec![tokio::spawn(Box::pin( diff --git a/src/processing/common.rs b/src/processing/common.rs index 5ff0514..82290dc 100644 --- a/src/processing/common.rs +++ b/src/processing/common.rs @@ -20,7 +20,7 @@ pub struct IndexerServerStatus { /// Checks if indexers of the given data type are still syncing /// Returns true if any of the indexers is still syncing -pub async fn is_syncing(data_type: &DataType) -> Result { +pub async fn data_is_syncing(data_type: &DataType) -> Result { let config = get_config(None).await; let table_name = config.table_name(data_type.clone()); @@ -44,8 +44,8 @@ pub async fn is_syncing(data_type: &DataType) -> Result { } /// Check if the indexers are still syncing -pub async fn indexers_are_synced(data_type: &DataType) -> bool { - match is_syncing(data_type).await { +pub async fn data_indexers_are_synced(data_type: &DataType) -> bool { + match data_is_syncing(data_type).await { Ok(true) => { log::info!("[{data_type}] Indexers are still syncing ♻️"); false @@ -64,6 +64,50 @@ pub async fn indexers_are_synced(data_type: &DataType) -> bool { } } +/// Checks if indexers of the given data type are still syncing +/// Returns true if any of the indexers is still syncing +pub async fn is_syncing(table_name: &str) -> Result { + let config = get_config(None).await; + + let status = get_sink_status(table_name, config.indexer_url()).await?; + + let provider = &config.network().provider; + + let blocks_left = blocks_left(&status, provider).await?; + + // Update the prometheus metric + INDEXER_BLOCKS_LEFT + .with_label_values(&[ + (&config.network().name).into(), + &table_name.to_string().to_ascii_lowercase(), + ]) + .set(blocks_left.unwrap_or(0) as i64); + + // Check if any indexer is still syncing + Ok(blocks_left.is_some()) +} + +/// Check if the indexers are still syncing +pub async fn indexers_are_synced(table_name: &str) -> bool { + match is_syncing(table_name).await { + Ok(true) => { + log::info!("[{table_name}] Indexers are still syncing ♻️"); + false + } + Ok(false) => { + log::info!("[{table_name}] Indexers are synced ✅"); + true + } + Err(e) => { + log::error!( + "[{table_name}] Failed to check if indexers are syncing: {:?}", + e + ); + false + } + } +} + /// Returns the status of the indexer /// /// # Arguments From 0b01795582a279bea4db9fdd839c0e6776403cf1 Mon Sep 17 00:00:00 2001 From: akhercha Date: Wed, 9 Oct 2024 17:08:33 +0200 Subject: [PATCH 5/8] feat(PragmaGix-monitoring): --- src/models.rs | 6 +++--- src/processing/dispatch.rs | 4 ++-- src/processing/future.rs | 26 +++++++++++++++++++++++--- src/processing/spot.rs | 37 +++++++++++++++++++------------------ src/schema.rs | 2 +- 5 files changed, 48 insertions(+), 27 deletions(-) diff --git a/src/models.rs b/src/models.rs index 3f57891..a027244 100644 --- a/src/models.rs +++ b/src/models.rs @@ -119,10 +119,10 @@ pub struct OORequest { pub updated_at_tx: String, } -#[derive(Debug, Queryable, Selectable, QueryableByName)] +#[derive(Queryable, Debug, QueryableByName, Selectable)] #[diesel(table_name = crate::schema::pragma_devnet_dispatch_event)] #[diesel(check_for_backend(diesel::pg::Pg))] -pub struct PragmaDevnetDispatchEvent { +pub struct FeedDispatch { pub network: String, pub block_hash: String, pub block_number: i64, @@ -130,5 +130,5 @@ pub struct PragmaDevnetDispatchEvent { pub transaction_hash: String, pub hyperlane_message_nonce: BigDecimal, pub feeds_updated: Option>, - pub _cursor: (Bound, Bound), + pub _cursor: i64, } diff --git a/src/processing/dispatch.rs b/src/processing/dispatch.rs index cd3e85a..d1d465f 100644 --- a/src/processing/dispatch.rs +++ b/src/processing/dispatch.rs @@ -8,7 +8,7 @@ use crate::constants::{ DISPATCH_EVENT_NB_FEEDS_UPDATED, }; use crate::schema::pragma_devnet_dispatch_event::dsl as dispatch_dsl; -use crate::{config::get_config, error::MonitoringError, models::PragmaDevnetDispatchEvent}; +use crate::{config::get_config, error::MonitoringError, models::FeedDispatch}; /// Read the database of the indexed Dispatch events and populate the metrics: /// * dispatch_event_latest_block, @@ -26,7 +26,7 @@ pub async fn process_dispatch_events( let latest_event = dispatch_dsl::pragma_devnet_dispatch_event .filter(dispatch_dsl::network.eq(network)) .order(dispatch_dsl::block_number.desc()) - .first::(&mut conn) + .first::(&mut conn) .await .optional()?; diff --git a/src/processing/future.rs b/src/processing/future.rs index 4ce619e..64b39f9 100644 --- a/src/processing/future.rs +++ b/src/processing/future.rs @@ -20,6 +20,7 @@ use crate::monitoring::{ use crate::schema::future_entry::dsl as testnet_dsl; use crate::schema::mainnet_future_entry::dsl as mainnet_dsl; +use crate::schema::pragma_devnet_future_entry::dsl as pragma_devnet_dsl; use bigdecimal::ToPrimitive; use diesel::ExpressionMethods; @@ -50,7 +51,13 @@ pub async fn process_data_by_pair( .first(&mut conn) .await? } - NetworkName::PragmaDevnet => unreachable!(), + NetworkName::PragmaDevnet => { + pragma_devnet_dsl::pragma_devnet_future_entry + .filter(pragma_devnet_dsl::pair_id.eq(pair.clone())) + .order(pragma_devnet_dsl::block_timestamp.desc()) + .first(&mut conn) + .await? + } }; log::info!("Processing data for pair: {}", pair); @@ -130,7 +137,14 @@ pub async fn process_data_by_pair_and_source( .first(&mut conn) .await? } - NetworkName::PragmaDevnet => unreachable!(), + NetworkName::PragmaDevnet => { + pragma_devnet_dsl::pragma_devnet_future_entry + .filter(pragma_devnet_dsl::pair_id.eq(pair)) + .filter(pragma_devnet_dsl::source.eq(src)) + .order(pragma_devnet_dsl::block_timestamp.desc()) + .first(&mut conn) + .await? + } }; let network_env = &config.network_str(); @@ -183,7 +197,13 @@ pub async fn process_data_by_publisher( .first(&mut conn) .await? } - NetworkName::PragmaDevnet => unreachable!(), + NetworkName::PragmaDevnet => { + pragma_devnet_dsl::pragma_devnet_future_entry + .filter(pragma_devnet_dsl::publisher.eq(publisher.clone())) + .order(pragma_devnet_dsl::block_timestamp.desc()) + .first(&mut conn) + .await? + } }; log::info!("Processing data for publisher: {}", publisher); diff --git a/src/processing/spot.rs b/src/processing/spot.rs index 3e0fef8..9ff92d9 100644 --- a/src/processing/spot.rs +++ b/src/processing/spot.rs @@ -20,6 +20,7 @@ use crate::monitoring::{ }; use crate::schema::mainnet_spot_entry::dsl as mainnet_dsl; +use crate::schema::pragma_devnet_spot_entry::dsl as pragma_devnet_dsl; use crate::schema::spot_entry::dsl as testnet_dsl; use bigdecimal::ToPrimitive; @@ -54,10 +55,10 @@ pub async fn process_data_by_pair( .await? } NetworkName::PragmaDevnet => { - testnet_dsl::spot_entry - .filter(testnet_dsl::network.eq("pragma-devnet")) - .filter(testnet_dsl::pair_id.eq(pair.clone())) - .order(testnet_dsl::block_timestamp.desc()) + pragma_devnet_dsl::pragma_devnet_spot_entry + .filter(pragma_devnet_dsl::network.eq("pragma-devnet")) + .filter(pragma_devnet_dsl::pair_id.eq(pair.clone())) + .order(pragma_devnet_dsl::block_timestamp.desc()) .first(&mut conn) .await? } @@ -129,11 +130,11 @@ pub async fn process_data_by_pair_and_source( .await? } NetworkName::PragmaDevnet => { - testnet_dsl::spot_entry - .filter(testnet_dsl::network.eq("pragma-devnet")) - .filter(testnet_dsl::pair_id.eq(pair)) - .filter(testnet_dsl::source.eq(src)) - .order(testnet_dsl::block_timestamp.desc()) + pragma_devnet_dsl::pragma_devnet_spot_entry + .filter(pragma_devnet_dsl::network.eq("pragma-devnet")) + .filter(pragma_devnet_dsl::pair_id.eq(pair)) + .filter(pragma_devnet_dsl::source.eq(src)) + .order(pragma_devnet_dsl::block_timestamp.desc()) .first(&mut conn) .await? } @@ -192,10 +193,10 @@ pub async fn process_data_by_publisher( .await? } NetworkName::PragmaDevnet => { - testnet_dsl::spot_entry - .filter(testnet_dsl::network.eq("pragma-devnet")) - .filter(testnet_dsl::publisher.eq(publisher.clone())) - .order(testnet_dsl::block_timestamp.desc()) + pragma_devnet_dsl::pragma_devnet_spot_entry + .filter(pragma_devnet_dsl::network.eq("pragma-devnet")) + .filter(pragma_devnet_dsl::publisher.eq(publisher.clone())) + .order(pragma_devnet_dsl::block_timestamp.desc()) .first(&mut conn) .await? } @@ -278,11 +279,11 @@ pub async fn get_price_deviation_for_source_from_chain( .await? } NetworkName::PragmaDevnet => { - testnet_dsl::spot_entry - .filter(testnet_dsl::network.eq("pragma-devnet")) - .filter(testnet_dsl::pair_id.eq(pair)) - .filter(testnet_dsl::source.eq(source)) - .order(testnet_dsl::block_timestamp.desc()) + pragma_devnet_dsl::pragma_devnet_spot_entry + .filter(pragma_devnet_dsl::network.eq("pragma-devnet")) + .filter(pragma_devnet_dsl::pair_id.eq(pair)) + .filter(pragma_devnet_dsl::source.eq(source)) + .order(pragma_devnet_dsl::block_timestamp.desc()) .first(&mut conn) .await? } diff --git a/src/schema.rs b/src/schema.rs index 5f7e695..f3e26bd 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -295,7 +295,7 @@ diesel::table! { transaction_hash -> Varchar, hyperlane_message_nonce -> Numeric, feeds_updated -> Nullable>, - _cursor -> Int8range, + _cursor -> Int8, } } From b30118fdd4e5d897c4183efa172fc14c08b90689 Mon Sep 17 00:00:00 2001 From: akhercha Date: Wed, 9 Oct 2024 17:16:04 +0200 Subject: [PATCH 6/8] feat(PragmaGix-monitoring): Removed filter by network name --- src/processing/spot.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/processing/spot.rs b/src/processing/spot.rs index 9ff92d9..dad26ed 100644 --- a/src/processing/spot.rs +++ b/src/processing/spot.rs @@ -41,7 +41,6 @@ pub async fn process_data_by_pair( let data: SpotEntry = match config.network().name { NetworkName::Testnet => { testnet_dsl::spot_entry - .filter(testnet_dsl::network.eq("testnet")) .filter(testnet_dsl::pair_id.eq(pair.clone())) .order(testnet_dsl::block_timestamp.desc()) .first(&mut conn) @@ -56,7 +55,6 @@ pub async fn process_data_by_pair( } NetworkName::PragmaDevnet => { pragma_devnet_dsl::pragma_devnet_spot_entry - .filter(pragma_devnet_dsl::network.eq("pragma-devnet")) .filter(pragma_devnet_dsl::pair_id.eq(pair.clone())) .order(pragma_devnet_dsl::block_timestamp.desc()) .first(&mut conn) @@ -122,7 +120,6 @@ pub async fn process_data_by_pair_and_source( let data: SpotEntry = match config.network().name { NetworkName::Testnet => { testnet_dsl::spot_entry - .filter(testnet_dsl::network.eq("testnet")) .filter(testnet_dsl::pair_id.eq(pair)) .filter(testnet_dsl::source.eq(src)) .order(testnet_dsl::block_timestamp.desc()) @@ -131,7 +128,6 @@ pub async fn process_data_by_pair_and_source( } NetworkName::PragmaDevnet => { pragma_devnet_dsl::pragma_devnet_spot_entry - .filter(pragma_devnet_dsl::network.eq("pragma-devnet")) .filter(pragma_devnet_dsl::pair_id.eq(pair)) .filter(pragma_devnet_dsl::source.eq(src)) .order(pragma_devnet_dsl::block_timestamp.desc()) @@ -186,7 +182,6 @@ pub async fn process_data_by_publisher( let data: SpotEntry = match config.network().name { NetworkName::Testnet => { testnet_dsl::spot_entry - .filter(testnet_dsl::network.eq("testnet")) .filter(testnet_dsl::publisher.eq(publisher.clone())) .order(testnet_dsl::block_timestamp.desc()) .first(&mut conn) @@ -194,7 +189,6 @@ pub async fn process_data_by_publisher( } NetworkName::PragmaDevnet => { pragma_devnet_dsl::pragma_devnet_spot_entry - .filter(pragma_devnet_dsl::network.eq("pragma-devnet")) .filter(pragma_devnet_dsl::publisher.eq(publisher.clone())) .order(pragma_devnet_dsl::block_timestamp.desc()) .first(&mut conn) @@ -271,7 +265,6 @@ pub async fn get_price_deviation_for_source_from_chain( let data: SpotEntry = match config.network().name { NetworkName::Testnet => { testnet_dsl::spot_entry - .filter(testnet_dsl::network.eq("testnet")) .filter(testnet_dsl::pair_id.eq(pair)) .filter(testnet_dsl::source.eq(source)) .order(testnet_dsl::block_timestamp.desc()) @@ -280,7 +273,6 @@ pub async fn get_price_deviation_for_source_from_chain( } NetworkName::PragmaDevnet => { pragma_devnet_dsl::pragma_devnet_spot_entry - .filter(pragma_devnet_dsl::network.eq("pragma-devnet")) .filter(pragma_devnet_dsl::pair_id.eq(pair)) .filter(pragma_devnet_dsl::source.eq(source)) .order(pragma_devnet_dsl::block_timestamp.desc()) From 2b1fde8fd2bb5d2a8c54bce89819bb70671dae70 Mon Sep 17 00:00:00 2001 From: akhercha Date: Wed, 9 Oct 2024 17:35:11 +0200 Subject: [PATCH 7/8] feat(PragmaGix-monitoring): moved tick --- src/main.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main.rs b/src/main.rs index bdcde99..38ad6dd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -302,6 +302,8 @@ pub(crate) async fn hyperlane_dispatch_monitor( let mut interval = interval(Duration::from_secs(5)); loop { + interval.tick().await; // Wait for the next tick + // Skip if indexer is still syncing if wait_for_syncing && !indexers_are_synced("pragma_devnet_dispatch_event").await { continue; @@ -312,7 +314,5 @@ pub(crate) async fn hyperlane_dispatch_monitor( ))]; let results: Vec<_> = futures::future::join_all(tasks).await; log_tasks_results("Dispatch", results); - - interval.tick().await; // Wait for the next tick } } From 8d9b3616af2958fce3af8d3da641f9c87ce1e423 Mon Sep 17 00:00:00 2001 From: akhercha Date: Thu, 10 Oct 2024 14:12:45 +0200 Subject: [PATCH 8/8] feat(PragmaGix-monitoring): fixed github stuck --- README.md | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index b994683..c9cfabb 100644 --- a/README.md +++ b/README.md @@ -17,17 +17,20 @@ It then processes the data and computes the following metrics: - `long_tail_asset_source_deviation{network, pair, type}`: Deviation of a source from the on-chain aggregated median price given source and pair. (in percents) - `long_tail_asset_total_sources{network, pair, type}`: Current number of sources available for a given pair. - `publisher_balance{network, publisher}`: Balance of a publisher. (in ETH) -- `vrf_balance{network}`: Balance of the VRF contract. (in ETH) -- `vrf_requests_count{network, status}`: Number of VRF requests handled for a given network. -- `vrf_time_since_last_handle_request{network}`: Time since the last VRF request was handled for a given network. -- `vrf_time_since_oldest_request_in_pending_status{network}`: Time in seconds that the oldest pending VRF request has been in the pending status for a given network. -Metrics specifics to our Pragma App Chain: +Metrics specifics to our Pragma App Chains: - `dispatch_event_latest_block`: The latest block that triggered a Dispatch event from Hyperlane, - `dispatch_event_feed_latest_block_update`: The latest block that triggered a Dispatch event from Hyperlane for a specific Feed ID, - `dispatch_event_nb_feeds_updated`: The number of feeds updated per Dispatch event at a given block. +Metrics specifics to Starknet (mainnet/sepolia): + +- `vrf_balance{network}`: Balance of the VRF contract. (in ETH) +- `vrf_requests_count{network, status}`: Number of VRF requests handled for a given network. +- `vrf_time_since_last_handle_request{network}`: Time since the last VRF request was handled for a given network. +- `vrf_time_since_oldest_request_in_pending_status{network}`: Time in seconds that the oldest pending VRF request has been in the pending status for a given network. + ## Shared Public Access Monitoring is not publicicly available yet but databases will soon be in read-only mode.