Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Publishers Balance Monitoring #12

Merged
merged 7 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ It then processes the data and computes the following metrics:
- `time_since_last_update_pair_id{network, pair, type}`: Time since an update has been published for a given pair. (in seconds)
- `price_deviation{network, pair, source, type}`: Deviation of the price from a reference price (DefiLlama API) given source and pair. (in percents)
- `price_deviation_source{network, pair, source, type}`: Deviation of the price from the on-chain aggregated median price given source and pair. (in percents)
- `publisher_balance{network, publisher}`: Balance of a publisher. (in ETH)

## Shared Public Access

Expand Down
30 changes: 27 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub struct DataInfo {
#[allow(unused)]
pub struct Config {
data_info: HashMap<DataType, DataInfo>,
publishers: Vec<String>,
publishers: HashMap<String, FieldElement>,
network: Network,
indexer_url: String,
}
Expand Down Expand Up @@ -129,6 +129,10 @@ impl Config {
NetworkName::Testnet => table_name.to_string(),
}
}

pub fn all_publishers(&self) -> &HashMap<String, FieldElement> {
&self.publishers
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -186,7 +190,7 @@ pub async fn config_force_init(config_input: ConfigInput) {
async fn init_publishers(
rpc_client: &JsonRpcClient<HttpTransport>,
oracle_address: FieldElement,
) -> (Vec<String>, FieldElement) {
) -> (HashMap<String, FieldElement>, FieldElement) {
// Fetch publisher registry address
let publisher_registry_address = *rpc_client
.call(
Expand Down Expand Up @@ -232,7 +236,27 @@ async fn init_publishers(
.filter(|publisher| !excluded_publishers.contains(publisher))
.collect::<Vec<String>>();

(publishers, publisher_registry_address)
let mut publishers_map: HashMap<String, FieldElement> = HashMap::new();
for publisher in publishers {
let field_publisher =
cairo_short_string_to_felt(&publisher).expect("Failed to parse publisher");
let publisher_address = *rpc_client
.call(
FunctionCall {
contract_address: publisher_registry_address,
entry_point_selector: selector!("get_publisher_address"), // Replace with actual function name
calldata: vec![field_publisher],
},
BlockId::Tag(BlockTag::Latest),
)
.await
.expect("failed to get publisher address")
.first()
.unwrap();

publishers_map.insert(publisher, publisher_address);
}
(publishers_map, publisher_registry_address)
}

async fn init_spot_config(
Expand Down
9 changes: 9 additions & 0 deletions src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ lazy_static! {
&["network", "type"]
)
.unwrap();
pub static ref PUBLISHER_BALANCE: GaugeVec = register_gauge_vec!(
opts!("publisher_balance", "Balance of the publisher in ETH"),
&["network", "publisher"]
)
.unwrap();
pub static ref API_PRICE_DEVIATION: GaugeVec = register_gauge_vec!(
opts!(
"api_price_deviation",
Expand Down Expand Up @@ -101,3 +106,7 @@ lazy_static! {
)
.unwrap();
}

pub const FEE_TOKEN_DECIMALS: i32 = 18;
pub const FEE_TOKEN_ADDRESS: &str =
"0x049d36570d4e46f48e99674bd3fcc84644ddd6b96f7c741b1562b82f9e004dc7";
49 changes: 44 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
extern crate diesel;
extern crate dotenv;

use config::get_config;
use config::DataType;
use config::{get_config, DataType};
use diesel_async::pooled_connection::deadpool::*;
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
use diesel_async::AsyncPgConnection;
Expand All @@ -12,7 +11,7 @@ use std::env;
use std::time::Duration;
use tokio::time::interval;

use crate::processing::common::is_syncing;
use crate::processing::common::{check_publisher_balance, is_syncing};

// Configuration
mod config;
Expand Down Expand Up @@ -59,11 +58,17 @@ async fn main() {
let spot_monitoring = tokio::spawn(monitor(pool.clone(), true, &DataType::Spot));
let future_monitoring = tokio::spawn(monitor(pool.clone(), true, &DataType::Future));

let balance_monitoring = tokio::spawn(balance_monitor());
let api_monitoring = tokio::spawn(monitor_api());

// Wait for the monitoring to finish
let results =
futures::future::join_all(vec![spot_monitoring, future_monitoring, api_monitoring]).await;
let results = futures::future::join_all(vec![
spot_monitoring,
future_monitoring,
api_monitoring,
balance_monitoring,
])
.await;

// Check if any of the monitoring tasks failed
if let Err(e) = &results[0] {
Expand All @@ -75,6 +80,9 @@ async fn main() {
if let Err(e) = &results[2] {
log::error!("[API] Monitoring failed: {:?}", e);
}
if let Err(e) = &results[3] {
log::error!("[BALANCE] Monitoring failed: {:?}", e);
}
}

pub(crate) async fn monitor_api() {
Expand Down Expand Up @@ -197,3 +205,34 @@ pub(crate) async fn monitor(
}
}
}

pub(crate) async fn balance_monitor() {
log::info!("[PUBLISHERS] Monitoring Publishers..");
let mut interval = interval(Duration::from_secs(30));
let monitoring_config: arc_swap::Guard<std::sync::Arc<config::Config>> = get_config(None).await;

loop {
interval.tick().await; // Wait for the next tick

let tasks: Vec<_> = monitoring_config
.all_publishers()
.iter()
.map(|(name, address)| {
tokio::spawn(Box::pin(check_publisher_balance(name.clone(), *address)))
})
.collect();

let results: Vec<_> = futures::future::join_all(tasks).await;

// Process or output the results
for result in &results {
match result {
Ok(data) => match data {
Ok(_) => log::info!("[PUBLISHERS]: Task finished successfully",),
Err(e) => log::error!("[PUBLISHERS]: Task failed with error: {e}"),
},
Err(e) => log::error!("[PUBLISHERS]: Task failed with error: {:?}", e),
}
}
}
}
2 changes: 2 additions & 0 deletions src/monitoring/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
pub mod price_deviation;
pub mod publisher_balance;
pub mod source_deviation;
pub mod time_since_last_update;

pub use price_deviation::price_deviation;
pub use publisher_balance::publisher_balance;
pub use source_deviation::source_deviation;
pub use time_since_last_update::time_since_last_update;
40 changes: 40 additions & 0 deletions src/monitoring/publisher_balance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use bigdecimal::ToPrimitive;
use starknet::{
core::types::{BlockId, BlockTag, FieldElement, FunctionCall},
macros::selector,
providers::Provider,
};

use crate::constants::{FEE_TOKEN_ADDRESS, FEE_TOKEN_DECIMALS};
use crate::{config::get_config, error::MonitoringError};

/// Returns the balance of a given publisher address
/// Note: Currently only reads ETH balance
pub async fn publisher_balance(publisher_address: FieldElement) -> Result<f64, MonitoringError> {
let config = get_config(None).await;

let client = &config.network().provider;
let token_balance = client
.call(
FunctionCall {
contract_address: FieldElement::from_hex_be(FEE_TOKEN_ADDRESS)
.expect("failed to convert token address"),
entry_point_selector: selector!("balanceOf"),
calldata: vec![publisher_address],
},
BlockId::Tag(BlockTag::Latest),
)
.await
.map_err(|e| MonitoringError::OnChain(e.to_string()))?;

let on_chain_balance = token_balance
.first()
.ok_or(MonitoringError::OnChain("No data".to_string()))?
.to_big_decimal(FEE_TOKEN_DECIMALS)
.to_f64()
.ok_or(MonitoringError::Conversion(
"Failed to convert to f64".to_string(),
))?;

Ok(on_chain_balance)
}
27 changes: 21 additions & 6 deletions src/processing/common.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use serde::{Deserialize, Serialize};
use starknet::providers::{jsonrpc::HttpTransport, JsonRpcClient, Provider};

use crate::monitoring::publisher_balance;
use crate::{
config::{get_config, DataType},
constants::INDEXER_BLOCKS_LEFT,
constants::{INDEXER_BLOCKS_LEFT, PUBLISHER_BALANCE},
error::MonitoringError,
};

use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use serde::{Deserialize, Serialize};
use starknet::core::types::FieldElement;
use starknet::providers::{jsonrpc::HttpTransport, JsonRpcClient, Provider};
#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)]
pub struct IndexerServerStatus {
pub status: i32,
Expand Down Expand Up @@ -162,3 +162,18 @@ pub async fn query_pragma_api(
))),
}
}

pub async fn check_publisher_balance(
publisher: String,
publisher_address: FieldElement,
) -> Result<(), MonitoringError> {
let config = get_config(None).await;
let balance = publisher_balance(publisher_address).await?;

let network_env = &config.network_str();

PUBLISHER_BALANCE
.with_label_values(&[network_env, &publisher])
.set(balance);
Ok(())
}
Loading