Skip to content

Commit

Permalink
feat(PragmaGix-monitoring): Check sync status for Dispatch events
Browse files Browse the repository at this point in the history
  • Loading branch information
akhercha committed Oct 9, 2024
1 parent d5d0e8a commit 7493a4e
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 10 deletions.
13 changes: 6 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use tokio::task::JoinHandle;
use tokio::time::interval;

use config::{get_config, init_long_tail_asset_configuration, periodic_config_update, DataType};
use processing::common::{check_publisher_balance, indexers_are_synced};
use processing::common::{check_publisher_balance, data_indexers_are_synced, indexers_are_synced};
use utils::{is_long_tail_asset, log_monitoring_results, log_tasks_results};

struct MonitoringTask {
Expand Down Expand Up @@ -167,7 +167,7 @@ pub(crate) async fn onchain_monitor(
interval.tick().await; // Wait for the next tick

// Skip if indexer is still syncing
if wait_for_syncing && !indexers_are_synced(data_type).await {
if wait_for_syncing && !data_indexers_are_synced(data_type).await {
continue;
}

Expand Down Expand Up @@ -237,7 +237,7 @@ pub(crate) async fn publisher_monitor(
interval.tick().await; // Wait for the next tick

// Skip if indexer is still syncing
if wait_for_syncing && !indexers_are_synced(&DataType::Spot).await {
if wait_for_syncing && !data_indexers_are_synced(&DataType::Spot).await {
continue;
}

Expand Down Expand Up @@ -299,13 +299,12 @@ pub(crate) async fn hyperlane_dispatch_monitor(
pool: Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
wait_for_syncing: bool,
) {
let mut interval = interval(Duration::from_secs(30));
let mut interval = interval(Duration::from_secs(5));

loop {
// Skip if indexer is still syncing
if wait_for_syncing {
// TODO: Adapt this for dispatch events
log::info!("Not implemented yet. TODO");
if wait_for_syncing && !indexers_are_synced("pragma_devnet_dispatch_event").await {
continue;
}

let tasks: Vec<_> = vec![tokio::spawn(Box::pin(
Expand Down
50 changes: 47 additions & 3 deletions src/processing/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct IndexerServerStatus {

/// Checks if indexers of the given data type are still syncing
/// Returns true if any of the indexers is still syncing
pub async fn is_syncing(data_type: &DataType) -> Result<bool, MonitoringError> {
pub async fn data_is_syncing(data_type: &DataType) -> Result<bool, MonitoringError> {
let config = get_config(None).await;

let table_name = config.table_name(data_type.clone());
Expand All @@ -44,8 +44,8 @@ pub async fn is_syncing(data_type: &DataType) -> Result<bool, MonitoringError> {
}

/// Check if the indexers are still syncing
pub async fn indexers_are_synced(data_type: &DataType) -> bool {
match is_syncing(data_type).await {
pub async fn data_indexers_are_synced(data_type: &DataType) -> bool {
match data_is_syncing(data_type).await {
Ok(true) => {
log::info!("[{data_type}] Indexers are still syncing ♻️");
false
Expand All @@ -64,6 +64,50 @@ pub async fn indexers_are_synced(data_type: &DataType) -> bool {
}
}

/// Checks if indexers of the given data type are still syncing
/// Returns true if any of the indexers is still syncing
pub async fn is_syncing(table_name: &str) -> Result<bool, MonitoringError> {
let config = get_config(None).await;

let status = get_sink_status(table_name, config.indexer_url()).await?;

let provider = &config.network().provider;

let blocks_left = blocks_left(&status, provider).await?;

// Update the prometheus metric
INDEXER_BLOCKS_LEFT
.with_label_values(&[
(&config.network().name).into(),
&table_name.to_string().to_ascii_lowercase(),
])
.set(blocks_left.unwrap_or(0) as i64);

// Check if any indexer is still syncing
Ok(blocks_left.is_some())
}

/// Check if the indexers are still syncing
pub async fn indexers_are_synced(table_name: &str) -> bool {
match is_syncing(table_name).await {
Ok(true) => {
log::info!("[{table_name}] Indexers are still syncing ♻️");
false
}
Ok(false) => {
log::info!("[{table_name}] Indexers are synced ✅");
true
}
Err(e) => {
log::error!(
"[{table_name}] Failed to check if indexers are syncing: {:?}",
e
);
false
}
}
}

/// Returns the status of the indexer
///
/// # Arguments
Expand Down

0 comments on commit 7493a4e

Please sign in to comment.