Skip to content

Commit

Permalink
✨ Publishers Balance Monitoring (#12)
Browse files Browse the repository at this point in the history
* init

* add actual publishers address

* lint

* fix: fmt

* clippy

* fix: minor refactoring

* fix: renaming + update readme

---------

Co-authored-by: 0xevolve <[email protected]>
  • Loading branch information
JordyRo1 and EvolveArt authored Jan 23, 2024
1 parent 7229fb7 commit 29e3a46
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 14 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ It then processes the data and computes the following metrics:
- `time_since_last_update_pair_id{network, pair, type}`: Time since an update has been published for a given pair. (in seconds)
- `price_deviation{network, pair, source, type}`: Deviation of the price from a reference price (DefiLlama API) given source and pair. (in percents)
- `price_deviation_source{network, pair, source, type}`: Deviation of the price from the on-chain aggregated median price given source and pair. (in percents)
- `publisher_balance{network, publisher}`: Balance of a publisher. (in ETH)

## Shared Public Access

Expand Down
30 changes: 27 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub struct DataInfo {
#[allow(unused)]
pub struct Config {
data_info: HashMap<DataType, DataInfo>,
publishers: Vec<String>,
publishers: HashMap<String, FieldElement>,
network: Network,
indexer_url: String,
}
Expand Down Expand Up @@ -129,6 +129,10 @@ impl Config {
NetworkName::Testnet => table_name.to_string(),
}
}

pub fn all_publishers(&self) -> &HashMap<String, FieldElement> {
&self.publishers
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -186,7 +190,7 @@ pub async fn config_force_init(config_input: ConfigInput) {
async fn init_publishers(
rpc_client: &JsonRpcClient<HttpTransport>,
oracle_address: FieldElement,
) -> (Vec<String>, FieldElement) {
) -> (HashMap<String, FieldElement>, FieldElement) {
// Fetch publisher registry address
let publisher_registry_address = *rpc_client
.call(
Expand Down Expand Up @@ -232,7 +236,27 @@ async fn init_publishers(
.filter(|publisher| !excluded_publishers.contains(publisher))
.collect::<Vec<String>>();

(publishers, publisher_registry_address)
let mut publishers_map: HashMap<String, FieldElement> = HashMap::new();
for publisher in publishers {
let field_publisher =
cairo_short_string_to_felt(&publisher).expect("Failed to parse publisher");
let publisher_address = *rpc_client
.call(
FunctionCall {
contract_address: publisher_registry_address,
entry_point_selector: selector!("get_publisher_address"), // Replace with actual function name
calldata: vec![field_publisher],
},
BlockId::Tag(BlockTag::Latest),
)
.await
.expect("failed to get publisher address")
.first()
.unwrap();

publishers_map.insert(publisher, publisher_address);
}
(publishers_map, publisher_registry_address)
}

async fn init_spot_config(
Expand Down
9 changes: 9 additions & 0 deletions src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ lazy_static! {
&["network", "type"]
)
.unwrap();
pub static ref PUBLISHER_BALANCE: GaugeVec = register_gauge_vec!(
opts!("publisher_balance", "Balance of the publisher in ETH"),
&["network", "publisher"]
)
.unwrap();
pub static ref API_PRICE_DEVIATION: GaugeVec = register_gauge_vec!(
opts!(
"api_price_deviation",
Expand Down Expand Up @@ -101,3 +106,7 @@ lazy_static! {
)
.unwrap();
}

pub const FEE_TOKEN_DECIMALS: i32 = 18;
pub const FEE_TOKEN_ADDRESS: &str =
"0x049d36570d4e46f48e99674bd3fcc84644ddd6b96f7c741b1562b82f9e004dc7";
49 changes: 44 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
extern crate diesel;
extern crate dotenv;

use config::get_config;
use config::DataType;
use config::{get_config, DataType};
use diesel_async::pooled_connection::deadpool::*;
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
use diesel_async::AsyncPgConnection;
Expand All @@ -12,7 +11,7 @@ use std::env;
use std::time::Duration;
use tokio::time::interval;

use crate::processing::common::is_syncing;
use crate::processing::common::{check_publisher_balance, is_syncing};

// Configuration
mod config;
Expand Down Expand Up @@ -59,11 +58,17 @@ async fn main() {
let spot_monitoring = tokio::spawn(monitor(pool.clone(), true, &DataType::Spot));
let future_monitoring = tokio::spawn(monitor(pool.clone(), true, &DataType::Future));

let balance_monitoring = tokio::spawn(balance_monitor());
let api_monitoring = tokio::spawn(monitor_api());

// Wait for the monitoring to finish
let results =
futures::future::join_all(vec![spot_monitoring, future_monitoring, api_monitoring]).await;
let results = futures::future::join_all(vec![
spot_monitoring,
future_monitoring,
api_monitoring,
balance_monitoring,
])
.await;

// Check if any of the monitoring tasks failed
if let Err(e) = &results[0] {
Expand All @@ -75,6 +80,9 @@ async fn main() {
if let Err(e) = &results[2] {
log::error!("[API] Monitoring failed: {:?}", e);
}
if let Err(e) = &results[3] {
log::error!("[BALANCE] Monitoring failed: {:?}", e);
}
}

pub(crate) async fn monitor_api() {
Expand Down Expand Up @@ -197,3 +205,34 @@ pub(crate) async fn monitor(
}
}
}

pub(crate) async fn balance_monitor() {
log::info!("[PUBLISHERS] Monitoring Publishers..");
let mut interval = interval(Duration::from_secs(30));
let monitoring_config: arc_swap::Guard<std::sync::Arc<config::Config>> = get_config(None).await;

loop {
interval.tick().await; // Wait for the next tick

let tasks: Vec<_> = monitoring_config
.all_publishers()
.iter()
.map(|(name, address)| {
tokio::spawn(Box::pin(check_publisher_balance(name.clone(), *address)))
})
.collect();

let results: Vec<_> = futures::future::join_all(tasks).await;

// Process or output the results
for result in &results {
match result {
Ok(data) => match data {
Ok(_) => log::info!("[PUBLISHERS]: Task finished successfully",),
Err(e) => log::error!("[PUBLISHERS]: Task failed with error: {e}"),
},
Err(e) => log::error!("[PUBLISHERS]: Task failed with error: {:?}", e),
}
}
}
}
2 changes: 2 additions & 0 deletions src/monitoring/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
pub mod price_deviation;
pub mod publisher_balance;
pub mod source_deviation;
pub mod time_since_last_update;

pub use price_deviation::price_deviation;
pub use publisher_balance::publisher_balance;
pub use source_deviation::source_deviation;
pub use time_since_last_update::time_since_last_update;
40 changes: 40 additions & 0 deletions src/monitoring/publisher_balance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use bigdecimal::ToPrimitive;
use starknet::{
core::types::{BlockId, BlockTag, FieldElement, FunctionCall},
macros::selector,
providers::Provider,
};

use crate::constants::{FEE_TOKEN_ADDRESS, FEE_TOKEN_DECIMALS};
use crate::{config::get_config, error::MonitoringError};

/// Returns the balance of a given publisher address
/// Note: Currently only reads ETH balance
pub async fn publisher_balance(publisher_address: FieldElement) -> Result<f64, MonitoringError> {
let config = get_config(None).await;

let client = &config.network().provider;
let token_balance = client
.call(
FunctionCall {
contract_address: FieldElement::from_hex_be(FEE_TOKEN_ADDRESS)
.expect("failed to convert token address"),
entry_point_selector: selector!("balanceOf"),
calldata: vec![publisher_address],
},
BlockId::Tag(BlockTag::Latest),
)
.await
.map_err(|e| MonitoringError::OnChain(e.to_string()))?;

let on_chain_balance = token_balance
.first()
.ok_or(MonitoringError::OnChain("No data".to_string()))?
.to_big_decimal(FEE_TOKEN_DECIMALS)
.to_f64()
.ok_or(MonitoringError::Conversion(
"Failed to convert to f64".to_string(),
))?;

Ok(on_chain_balance)
}
27 changes: 21 additions & 6 deletions src/processing/common.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use serde::{Deserialize, Serialize};
use starknet::providers::{jsonrpc::HttpTransport, JsonRpcClient, Provider};

use crate::monitoring::publisher_balance;
use crate::{
config::{get_config, DataType},
constants::INDEXER_BLOCKS_LEFT,
constants::{INDEXER_BLOCKS_LEFT, PUBLISHER_BALANCE},
error::MonitoringError,
};

use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use serde::{Deserialize, Serialize};
use starknet::core::types::FieldElement;
use starknet::providers::{jsonrpc::HttpTransport, JsonRpcClient, Provider};
#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)]
pub struct IndexerServerStatus {
pub status: i32,
Expand Down Expand Up @@ -162,3 +162,18 @@ pub async fn query_pragma_api(
))),
}
}

pub async fn check_publisher_balance(
publisher: String,
publisher_address: FieldElement,
) -> Result<(), MonitoringError> {
let config = get_config(None).await;
let balance = publisher_balance(publisher_address).await?;

let network_env = &config.network_str();

PUBLISHER_BALANCE
.with_label_values(&[network_env, &publisher])
.set(balance);
Ok(())
}

0 comments on commit 29e3a46

Please sign in to comment.