Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PragmaGix monitoring #43

Merged
merged 8 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ It then processes the data and computes the following metrics:
- `long_tail_asset_source_deviation{network, pair, type}`: Deviation of a source from the on-chain aggregated median price given source and pair. (in percents)
- `long_tail_asset_total_sources{network, pair, type}`: Current number of sources available for a given pair.
- `publisher_balance{network, publisher}`: Balance of a publisher. (in ETH)

Metrics specifics to our Pragma App Chains:

- `dispatch_event_latest_block`: The latest block that triggered a Dispatch event from Hyperlane,
- `dispatch_event_feed_latest_block_update`: The latest block that triggered a Dispatch event from Hyperlane for a specific Feed ID,
- `dispatch_event_nb_feeds_updated`: The number of feeds updated per Dispatch event at a given block.

Metrics specifics to Starknet (mainnet/sepolia):

- `vrf_balance{network}`: Balance of the VRF contract. (in ETH)
- `vrf_requests_count{network, status}`: Number of VRF requests handled for a given network.
- `vrf_time_since_last_handle_request{network}`: Time since the last VRF request was handled for a given network.
Expand Down
8 changes: 8 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub enum NetworkName {
Mainnet,
#[strum(ascii_case_insensitive)]
Testnet,
#[strum(ascii_case_insensitive, serialize = "pragma-devnet")]
PragmaDevnet,
}

#[derive(Debug, EnumString, IntoStaticStr, PartialEq, Eq, Hash, Clone, Display)]
Expand Down Expand Up @@ -162,12 +164,18 @@ impl Config {
match self.network.name {
NetworkName::Mainnet => format!("mainnet_{}", table_name),
NetworkName::Testnet => table_name.to_string(),
NetworkName::PragmaDevnet => format!("pragma_devnet_{}", table_name),
}
}

pub fn all_publishers(&self) -> &HashMap<String, Felt> {
&self.publishers
}

/// Check if the configuration is set for a Pragma Chain
pub fn is_pragma_chain(&self) -> bool {
matches!(self.network.name, NetworkName::PragmaDevnet)
}
}

#[derive(Debug, Clone)]
Expand Down
21 changes: 21 additions & 0 deletions src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,27 @@ lazy_static! {
&["network"]
)
.unwrap();
pub static ref DISPATCH_EVENT_LATEST_BLOCK: GaugeVec = register_gauge_vec!(
opts!(
"dispatch_event_latest_block",
"The latest block that triggered a Dispatch event from Hyperlane"
),
&["network"]
).unwrap();
pub static ref DISPATCH_EVENT_FEED_LATEST_BLOCK_UPDATE: GaugeVec = register_gauge_vec!(
opts!(
"dispatch_event_feed_latest_block_update",
"The latest block that triggered a Dispatch event from Hyperlane for a specific Feed ID"
),
&["network", "feed_id"]
).unwrap();
pub static ref DISPATCH_EVENT_NB_FEEDS_UPDATED: GaugeVec = register_gauge_vec!(
opts!(
"dispatch_event_nb_feeds_updated",
"The number of feeds updated per Dispatch event at a given block"
),
&["network", "block"]
).unwrap();
}

#[allow(unused)]
Expand Down
123 changes: 83 additions & 40 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,25 @@ mod utils;
#[cfg(test)]
mod tests;

use std::collections::HashMap;
use std::time::Duration;
use std::{env, vec};

use deadpool::managed::Pool;
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
use diesel_async::AsyncPgConnection;
use dotenv::dotenv;
use tokio::task::JoinHandle;
use tokio::time::interval;

use config::{get_config, init_long_tail_asset_configuration, periodic_config_update, DataType};
use processing::common::{check_publisher_balance, indexers_are_synced};
use utils::{is_long_tail_asset, log_tasks_results};
use processing::common::{check_publisher_balance, data_indexers_are_synced, indexers_are_synced};
use utils::{is_long_tail_asset, log_monitoring_results, log_tasks_results};

struct MonitoringTask {
name: String,
handle: JoinHandle<()>,
}

#[tokio::main]
async fn main() {
Expand All @@ -56,47 +63,61 @@ async fn main() {

// Set the long tail asset list
init_long_tail_asset_configuration();

// Monitor spot/future in parallel
let spot_monitoring = tokio::spawn(onchain_monitor(pool.clone(), true, &DataType::Spot));
let future_monitoring = tokio::spawn(onchain_monitor(pool.clone(), true, &DataType::Future));
let publisher_monitoring = tokio::spawn(publisher_monitor(pool.clone(), false));
let api_monitoring = tokio::spawn(api_monitor());
let vrf_monitoring = tokio::spawn(vrf_monitor(pool.clone()));

let config_update = tokio::spawn(periodic_config_update());

// Wait for the monitoring to finish
let results = futures::future::join_all(vec![
spot_monitoring,
future_monitoring,
api_monitoring,
publisher_monitoring,
vrf_monitoring,
config_update,
])
.await;

// Check if any of the monitoring tasks failed
if let Err(e) = &results[0] {
log::error!("[SPOT] Monitoring failed: {:?}", e);
}
if let Err(e) = &results[1] {
log::error!("[FUTURE] Monitoring failed: {:?}", e);
}
if let Err(e) = &results[2] {
log::error!("[API] Monitoring failed: {:?}", e);
}
if let Err(e) = &results[3] {
log::error!("[PUBLISHERS] Monitoring failed: {:?}", e);
}
let monitoring_tasks = spawn_monitoring_tasks(pool.clone(), &monitoring_config).await;
handle_task_results(monitoring_tasks).await;
}

if let Err(e) = &results[4] {
log::error!("[VRF] Monitoring failed: {:?}", e);
async fn spawn_monitoring_tasks(
pool: Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
monitoring_config: &config::Config,
) -> Vec<MonitoringTask> {
let mut tasks = vec![
MonitoringTask {
name: "Config Update".to_string(),
handle: tokio::spawn(periodic_config_update()),
},
MonitoringTask {
name: "Spot Monitoring".to_string(),
handle: tokio::spawn(onchain_monitor(pool.clone(), true, &DataType::Spot)),
},
MonitoringTask {
name: "Future Monitoring".to_string(),
handle: tokio::spawn(onchain_monitor(pool.clone(), true, &DataType::Future)),
},
MonitoringTask {
name: "Publisher Monitoring".to_string(),
handle: tokio::spawn(publisher_monitor(pool.clone(), false)),
},
];

if monitoring_config.is_pragma_chain() {
tasks.push(MonitoringTask {
name: "Hyperlane Dispatches Monitoring".to_string(),
handle: tokio::spawn(hyperlane_dispatch_monitor(pool.clone(), true)),
});
} else {
tasks.push(MonitoringTask {
name: "API Monitoring".to_string(),
handle: tokio::spawn(api_monitor()),
});
tasks.push(MonitoringTask {
name: "VRF Monitoring".to_string(),
handle: tokio::spawn(vrf_monitor(pool.clone())),
});
}

if let Err(e) = &results[5] {
log::error!("[CONFIG] Config Update failed: {:?}", e);
tasks
}

async fn handle_task_results(tasks: Vec<MonitoringTask>) {
let mut results = HashMap::new();
for task in tasks {
let result = task.handle.await;
results.insert(task.name, result);
}
log_monitoring_results(results);
}

pub(crate) async fn api_monitor() {
Expand Down Expand Up @@ -146,7 +167,7 @@ pub(crate) async fn onchain_monitor(
interval.tick().await; // Wait for the next tick

// Skip if indexer is still syncing
if wait_for_syncing && !indexers_are_synced(data_type).await {
if wait_for_syncing && !data_indexers_are_synced(data_type).await {
continue;
}

Expand Down Expand Up @@ -216,7 +237,7 @@ pub(crate) async fn publisher_monitor(
interval.tick().await; // Wait for the next tick

// Skip if indexer is still syncing
if wait_for_syncing && !indexers_are_synced(&DataType::Spot).await {
if wait_for_syncing && !data_indexers_are_synced(&DataType::Spot).await {
continue;
}

Expand Down Expand Up @@ -273,3 +294,25 @@ pub(crate) async fn vrf_monitor(pool: Pool<AsyncDieselConnectionManager<AsyncPgC
log_tasks_results("VRF", results);
}
}

pub(crate) async fn hyperlane_dispatch_monitor(
pool: Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
wait_for_syncing: bool,
) {
let mut interval = interval(Duration::from_secs(5));

loop {
interval.tick().await; // Wait for the next tick

// Skip if indexer is still syncing
if wait_for_syncing && !indexers_are_synced("pragma_devnet_dispatch_event").await {
continue;
}

let tasks: Vec<_> = vec![tokio::spawn(Box::pin(
processing::dispatch::process_dispatch_events(pool.clone()),
))];
let results: Vec<_> = futures::future::join_all(tasks).await;
log_tasks_results("Dispatch", results);
}
}
53 changes: 33 additions & 20 deletions src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,32 +90,45 @@ pub struct VrfRequest {
pub data_id: String,
}


#[derive(Queryable, Debug, QueryableByName, Selectable)]
#[diesel(primary_key(data_id))]
#[diesel(check_for_backend(diesel::pg::Pg))]
#[diesel(table_name = crate::schema::oo_requests)]
pub struct OORequest {
pub network: String,
pub data_id: String,
pub assertion_id: String,
pub domain_id: String,
pub claim: String,
pub asserter: String,
pub disputer: Option<String>,
pub disputed: Option<bool>,
pub network: String,
pub data_id: String,
pub assertion_id: String,
pub domain_id: String,
pub claim: String,
pub asserter: String,
pub disputer: Option<String>,
pub disputed: Option<bool>,
pub dispute_id: Option<String>,
pub callback_recipient: String,
pub escalation_manager: String,
pub caller: String,
pub expiration_timestamp: NaiveDateTime,
pub callback_recipient: String,
pub escalation_manager: String,
pub caller: String,
pub expiration_timestamp: NaiveDateTime,
pub settled: Option<bool>,
pub settlement_resolution: Option<bool>,
pub settle_caller: Option<String>,
pub currency: String,
pub bond: BigDecimal,
pub _cursor: (Bound<i64>, Bound<i64>),
pub identifier: String,
pub settlement_resolution: Option<bool>,
pub settle_caller: Option<String>,
pub currency: String,
pub bond: BigDecimal,
pub _cursor: (Bound<i64>, Bound<i64>),
pub identifier: String,
pub updated_at: NaiveDateTime,
pub updated_at_tx: String,
}
}

#[derive(Queryable, Debug, QueryableByName, Selectable)]
#[diesel(table_name = crate::schema::pragma_devnet_dispatch_event)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct FeedDispatch {
pub network: String,
pub block_hash: String,
pub block_number: i64,
pub block_timestamp: NaiveDateTime,
pub transaction_hash: String,
pub hyperlane_message_nonce: BigDecimal,
pub feeds_updated: Option<Vec<String>>,
pub _cursor: i64,
}
50 changes: 47 additions & 3 deletions src/processing/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct IndexerServerStatus {

/// Checks if indexers of the given data type are still syncing
/// Returns true if any of the indexers is still syncing
pub async fn is_syncing(data_type: &DataType) -> Result<bool, MonitoringError> {
pub async fn data_is_syncing(data_type: &DataType) -> Result<bool, MonitoringError> {
let config = get_config(None).await;

let table_name = config.table_name(data_type.clone());
Expand All @@ -44,8 +44,8 @@ pub async fn is_syncing(data_type: &DataType) -> Result<bool, MonitoringError> {
}

/// Check if the indexers are still syncing
pub async fn indexers_are_synced(data_type: &DataType) -> bool {
match is_syncing(data_type).await {
pub async fn data_indexers_are_synced(data_type: &DataType) -> bool {
match data_is_syncing(data_type).await {
Ok(true) => {
log::info!("[{data_type}] Indexers are still syncing ♻️");
false
Expand All @@ -64,6 +64,50 @@ pub async fn indexers_are_synced(data_type: &DataType) -> bool {
}
}

/// Checks if indexers of the given data type are still syncing
/// Returns true if any of the indexers is still syncing
pub async fn is_syncing(table_name: &str) -> Result<bool, MonitoringError> {
let config = get_config(None).await;

let status = get_sink_status(table_name, config.indexer_url()).await?;

let provider = &config.network().provider;

let blocks_left = blocks_left(&status, provider).await?;

// Update the prometheus metric
INDEXER_BLOCKS_LEFT
.with_label_values(&[
(&config.network().name).into(),
&table_name.to_string().to_ascii_lowercase(),
])
.set(blocks_left.unwrap_or(0) as i64);

// Check if any indexer is still syncing
Ok(blocks_left.is_some())
}

/// Check if the indexers are still syncing
pub async fn indexers_are_synced(table_name: &str) -> bool {
match is_syncing(table_name).await {
Ok(true) => {
log::info!("[{table_name}] Indexers are still syncing ♻️");
false
}
Ok(false) => {
log::info!("[{table_name}] Indexers are synced ✅");
true
}
Err(e) => {
log::error!(
"[{table_name}] Failed to check if indexers are syncing: {:?}",
e
);
false
}
}
}

/// Returns the status of the indexer
///
/// # Arguments
Expand Down
Loading
Loading