From f7d79e2cd07b1cf92462ba7019d0795752e60efd Mon Sep 17 00:00:00 2001 From: JordyRo1 Date: Fri, 1 Mar 2024 12:20:28 +0100 Subject: [PATCH] Time since last update publisher distinction (spot/futures) --- src/constants.rs | 2 +- src/main.rs | 4 ++++ src/processing/future.rs | 48 ++++++++++++++++++++++++++++++++++++++++ src/processing/spot.rs | 2 +- 4 files changed, 54 insertions(+), 2 deletions(-) diff --git a/src/constants.rs b/src/constants.rs index 7808e8f..e7294fe 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -21,7 +21,7 @@ lazy_static! { "time_since_last_update_seconds", "Time since the last update in seconds." ), - &["network", "publisher"] + &["network", "publisher","type"] ) .unwrap(); pub static ref PAIR_PRICE: GaugeVec = register_gauge_vec!( diff --git a/src/main.rs b/src/main.rs index c02c44e..a355cd8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -257,6 +257,10 @@ pub(crate) async fn publisher_monitor( pool.clone(), publisher.clone(), ))), + tokio::spawn(Box::pin(processing::future::process_data_by_publisher( + pool.clone(), + publisher.clone(), + ))), ] }) .collect(); diff --git a/src/processing/future.rs b/src/processing/future.rs index 2cb2eb9..b103139 100644 --- a/src/processing/future.rs +++ b/src/processing/future.rs @@ -10,6 +10,7 @@ use crate::constants::PAIR_PRICE; use crate::constants::PRICE_DEVIATION; use crate::constants::PRICE_DEVIATION_SOURCE; use crate::constants::TIME_SINCE_LAST_UPDATE_PAIR_ID; +use crate::constants::TIME_SINCE_LAST_UPDATE_PUBLISHER; use crate::diesel::QueryDsl; use crate::error::MonitoringError; use crate::models::FutureEntry; @@ -174,3 +175,50 @@ pub async fn process_data_by_pair_and_source( Err(e) => Err(e.into()), } } + + +pub async fn process_data_by_publisher( + pool: deadpool::managed::Pool>, + publisher: String, +) -> Result<(), MonitoringError> { + let mut conn = pool + .get() + .await + .map_err(|_| MonitoringError::Connection("Failed to get connection".to_string()))?; + + let config = get_config(None).await; + + let result: Result = match config.network().name { + NetworkName::Testnet => { + testnet_dsl::future_entry + .filter(testnet_dsl::publisher.eq(publisher.clone())) + .order(testnet_dsl::block_timestamp.desc()) + .first(&mut conn) + .await + } + NetworkName::Mainnet => { + mainnet_dsl::mainnet_future_entry + .filter(mainnet_dsl::publisher.eq(publisher.clone())) + .order(mainnet_dsl::block_timestamp.desc()) + .first(&mut conn) + .await + } + }; + + log::info!("Processing data for publisher: {}", publisher); + + match result { + Ok(data) => { + let network_env = &config.network_str(); + + let seconds_since_last_publish = time_since_last_update(&data); + let time_labels = + TIME_SINCE_LAST_UPDATE_PUBLISHER.with_label_values(&[network_env, &publisher, "future"]); + + time_labels.set(seconds_since_last_publish as f64); + + Ok(()) + } + Err(e) => Err(e.into()), + } +} \ No newline at end of file diff --git a/src/processing/spot.rs b/src/processing/spot.rs index 9935efb..70505ae 100644 --- a/src/processing/spot.rs +++ b/src/processing/spot.rs @@ -207,7 +207,7 @@ pub async fn process_data_by_publisher( let seconds_since_last_publish = time_since_last_update(&data); let time_labels = - TIME_SINCE_LAST_UPDATE_PUBLISHER.with_label_values(&[network_env, &publisher]); + TIME_SINCE_LAST_UPDATE_PUBLISHER.with_label_values(&[network_env, &publisher, "spot"]); time_labels.set(seconds_since_last_publish as f64);