Skip to content

Commit

Permalink
Time since last update publisher distinction (spot/futures)
Browse files Browse the repository at this point in the history
  • Loading branch information
JordyRo1 committed Mar 1, 2024
1 parent 8c655a5 commit f7d79e2
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ lazy_static! {
"time_since_last_update_seconds",
"Time since the last update in seconds."
),
&["network", "publisher"]
&["network", "publisher","type"]
)
.unwrap();
pub static ref PAIR_PRICE: GaugeVec = register_gauge_vec!(
Expand Down
4 changes: 4 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ pub(crate) async fn publisher_monitor(
pool.clone(),
publisher.clone(),
))),
tokio::spawn(Box::pin(processing::future::process_data_by_publisher(
pool.clone(),
publisher.clone(),
))),
]
})
.collect();
Expand Down
48 changes: 48 additions & 0 deletions src/processing/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::constants::PAIR_PRICE;
use crate::constants::PRICE_DEVIATION;
use crate::constants::PRICE_DEVIATION_SOURCE;
use crate::constants::TIME_SINCE_LAST_UPDATE_PAIR_ID;
use crate::constants::TIME_SINCE_LAST_UPDATE_PUBLISHER;
use crate::diesel::QueryDsl;
use crate::error::MonitoringError;
use crate::models::FutureEntry;
Expand Down Expand Up @@ -174,3 +175,50 @@ pub async fn process_data_by_pair_and_source(
Err(e) => Err(e.into()),
}
}


pub async fn process_data_by_publisher(
pool: deadpool::managed::Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
publisher: String,
) -> Result<(), MonitoringError> {
let mut conn = pool
.get()
.await
.map_err(|_| MonitoringError::Connection("Failed to get connection".to_string()))?;

let config = get_config(None).await;

let result: Result<FutureEntry, _> = match config.network().name {
NetworkName::Testnet => {
testnet_dsl::future_entry
.filter(testnet_dsl::publisher.eq(publisher.clone()))
.order(testnet_dsl::block_timestamp.desc())
.first(&mut conn)
.await
}
NetworkName::Mainnet => {
mainnet_dsl::mainnet_future_entry
.filter(mainnet_dsl::publisher.eq(publisher.clone()))
.order(mainnet_dsl::block_timestamp.desc())
.first(&mut conn)
.await
}
};

log::info!("Processing data for publisher: {}", publisher);

match result {
Ok(data) => {
let network_env = &config.network_str();

let seconds_since_last_publish = time_since_last_update(&data);
let time_labels =
TIME_SINCE_LAST_UPDATE_PUBLISHER.with_label_values(&[network_env, &publisher, "future"]);

time_labels.set(seconds_since_last_publish as f64);

Ok(())
}
Err(e) => Err(e.into()),
}
}
2 changes: 1 addition & 1 deletion src/processing/spot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ pub async fn process_data_by_publisher(

let seconds_since_last_publish = time_since_last_update(&data);
let time_labels =
TIME_SINCE_LAST_UPDATE_PUBLISHER.with_label_values(&[network_env, &publisher]);
TIME_SINCE_LAST_UPDATE_PUBLISHER.with_label_values(&[network_env, &publisher, "spot"]);

time_labels.set(seconds_since_last_publish as f64);

Expand Down

0 comments on commit f7d79e2

Please sign in to comment.