From 7493a4ebf32c1b3205f28c1bd3df8f86b9c97b60 Mon Sep 17 00:00:00 2001 From: akhercha Date: Wed, 9 Oct 2024 16:43:39 +0200 Subject: [PATCH] feat(PragmaGix-monitoring): Check sync status for Dispatch events --- src/main.rs | 13 +++++------ src/processing/common.rs | 50 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 53 insertions(+), 10 deletions(-) diff --git a/src/main.rs b/src/main.rs index ba444c8..bdcde99 100644 --- a/src/main.rs +++ b/src/main.rs @@ -37,7 +37,7 @@ use tokio::task::JoinHandle; use tokio::time::interval; use config::{get_config, init_long_tail_asset_configuration, periodic_config_update, DataType}; -use processing::common::{check_publisher_balance, indexers_are_synced}; +use processing::common::{check_publisher_balance, data_indexers_are_synced, indexers_are_synced}; use utils::{is_long_tail_asset, log_monitoring_results, log_tasks_results}; struct MonitoringTask { @@ -167,7 +167,7 @@ pub(crate) async fn onchain_monitor( interval.tick().await; // Wait for the next tick // Skip if indexer is still syncing - if wait_for_syncing && !indexers_are_synced(data_type).await { + if wait_for_syncing && !data_indexers_are_synced(data_type).await { continue; } @@ -237,7 +237,7 @@ pub(crate) async fn publisher_monitor( interval.tick().await; // Wait for the next tick // Skip if indexer is still syncing - if wait_for_syncing && !indexers_are_synced(&DataType::Spot).await { + if wait_for_syncing && !data_indexers_are_synced(&DataType::Spot).await { continue; } @@ -299,13 +299,12 @@ pub(crate) async fn hyperlane_dispatch_monitor( pool: Pool>, wait_for_syncing: bool, ) { - let mut interval = interval(Duration::from_secs(30)); + let mut interval = interval(Duration::from_secs(5)); loop { // Skip if indexer is still syncing - if wait_for_syncing { - // TODO: Adapt this for dispatch events - log::info!("Not implemented yet. TODO"); + if wait_for_syncing && !indexers_are_synced("pragma_devnet_dispatch_event").await { + continue; } let tasks: Vec<_> = vec![tokio::spawn(Box::pin( diff --git a/src/processing/common.rs b/src/processing/common.rs index 5ff0514..82290dc 100644 --- a/src/processing/common.rs +++ b/src/processing/common.rs @@ -20,7 +20,7 @@ pub struct IndexerServerStatus { /// Checks if indexers of the given data type are still syncing /// Returns true if any of the indexers is still syncing -pub async fn is_syncing(data_type: &DataType) -> Result { +pub async fn data_is_syncing(data_type: &DataType) -> Result { let config = get_config(None).await; let table_name = config.table_name(data_type.clone()); @@ -44,8 +44,8 @@ pub async fn is_syncing(data_type: &DataType) -> Result { } /// Check if the indexers are still syncing -pub async fn indexers_are_synced(data_type: &DataType) -> bool { - match is_syncing(data_type).await { +pub async fn data_indexers_are_synced(data_type: &DataType) -> bool { + match data_is_syncing(data_type).await { Ok(true) => { log::info!("[{data_type}] Indexers are still syncing ♻️"); false @@ -64,6 +64,50 @@ pub async fn indexers_are_synced(data_type: &DataType) -> bool { } } +/// Checks if indexers of the given data type are still syncing +/// Returns true if any of the indexers is still syncing +pub async fn is_syncing(table_name: &str) -> Result { + let config = get_config(None).await; + + let status = get_sink_status(table_name, config.indexer_url()).await?; + + let provider = &config.network().provider; + + let blocks_left = blocks_left(&status, provider).await?; + + // Update the prometheus metric + INDEXER_BLOCKS_LEFT + .with_label_values(&[ + (&config.network().name).into(), + &table_name.to_string().to_ascii_lowercase(), + ]) + .set(blocks_left.unwrap_or(0) as i64); + + // Check if any indexer is still syncing + Ok(blocks_left.is_some()) +} + +/// Check if the indexers are still syncing +pub async fn indexers_are_synced(table_name: &str) -> bool { + match is_syncing(table_name).await { + Ok(true) => { + log::info!("[{table_name}] Indexers are still syncing ♻️"); + false + } + Ok(false) => { + log::info!("[{table_name}] Indexers are synced ✅"); + true + } + Err(e) => { + log::error!( + "[{table_name}] Failed to check if indexers are syncing: {:?}", + e + ); + false + } + } +} + /// Returns the status of the indexer /// /// # Arguments