Skip to content

Commit

Permalink
Merge branch 'main' into Processing-function-for-publishers
Browse files Browse the repository at this point in the history
  • Loading branch information
JordyRo1 committed Jan 24, 2024
2 parents eff7212 + 2ad5794 commit 90d5e33
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 15 deletions.
8 changes: 8 additions & 0 deletions prometheus/alerts.rules.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ groups:
annotations:
summary: "Publisher balance is low"
description: "The {{ $labels.publisher }} balance is below 0.1 ETH."
- alert: PriceDeviationTooHigh
expr: on_off_price_deviation > 0.025
for: 5m
labels:
severity: critical
annotations:
summary: "Price deviation is too high"
description: "The on-chain price of {{ $labels.pair }} from the reference price has exceeded 2.5%."
- name: API
rules:
- alert: TimeSinceLastUpdateTooHigh
Expand Down
8 changes: 8 additions & 0 deletions src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ lazy_static! {
&["network", "pair"]
)
.unwrap();
pub static ref ON_OFF_PRICE_DEVIATION: GaugeVec = register_gauge_vec!(
opts!(
"on_off_price_deviation",
"On chain price deviation from the reference price"
),
&["network", "pair", "type"]
)
.unwrap();
pub static ref API_TIME_SINCE_LAST_UPDATE: GaugeVec = register_gauge_vec!(
opts!(
"api_time_since_last_update",
Expand Down
1 change: 0 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ async fn main() {
let database_url: String = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let config = AsyncDieselConnectionManager::<diesel_async::AsyncPgConnection>::new(database_url);
let pool = Pool::builder(config).build().unwrap();

// Monitor spot/future in parallel
let spot_monitoring = tokio::spawn(monitor(pool.clone(), true, &DataType::Spot));
let future_monitoring = tokio::spawn(monitor(pool.clone(), true, &DataType::Future));
Expand Down
2 changes: 2 additions & 0 deletions src/monitoring/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
pub mod on_off_deviation;
pub mod price_deviation;
pub mod publisher_balance;
pub mod source_deviation;
pub mod time_since_last_update;

pub use on_off_deviation::on_off_price_deviation;
pub use price_deviation::price_deviation;
pub use publisher_balance::publisher_balance;
pub use source_deviation::source_deviation;
Expand Down
134 changes: 134 additions & 0 deletions src/monitoring/on_off_deviation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use bigdecimal::ToPrimitive;
use starknet::{
core::{
types::{BlockId, BlockTag, FieldElement, FunctionCall},
utils::cairo_short_string_to_felt,
},
macros::selector,
providers::Provider,
};

use crate::monitoring::price_deviation::CoinPricesDTO;
use crate::{
config::{get_config, DataType},
constants::COINGECKO_IDS,
error::MonitoringError,
};

/// On-chain price deviation from the reference price.
/// Returns the deviation and the number of sources aggregated.
///
/// # Arguments
///
/// * `pair_id` - The pair id.
/// * `timestamp` - The timestamp for which to get the price.
/// * `data_type` - The type of data to get.
///
/// # Returns
///
/// * `Ok((deviation, num_sources_aggregated))` - The deviation and the number of sources aggregated.
/// * `Err(MonitoringError)` - The error.
pub async fn on_off_price_deviation(
pair_id: String,
timestamp: u64,
data_type: DataType,
) -> Result<(f64, u32), MonitoringError> {
let ids = &COINGECKO_IDS;
let config = get_config(None).await;
let client = &config.network().provider;
let field_pair = cairo_short_string_to_felt(&pair_id).expect("failed to convert pair id");

let calldata = match data_type {
DataType::Spot => vec![FieldElement::ZERO, field_pair],
DataType::Future => vec![FieldElement::ONE, field_pair, FieldElement::ZERO],
};

let data = client
.call(
FunctionCall {
contract_address: config.network().oracle_address,
entry_point_selector: selector!("get_data_median"),
calldata,
},
BlockId::Tag(BlockTag::Latest),
)
.await
.map_err(|e| MonitoringError::OnChain(e.to_string()))?;

let decimals =
config
.decimals(data_type.clone())
.get(&pair_id)
.ok_or(MonitoringError::OnChain(format!(
"Failed to get decimals for pair {:?}",
pair_id
)))?;

let on_chain_price = data
.first()
.ok_or(MonitoringError::OnChain("No data".to_string()))?
.to_big_decimal(*decimals)
.to_f64()
.ok_or(MonitoringError::Conversion(
"Failed to convert to f64".to_string(),
))?;

let (deviation, num_sources_aggregated) = match data_type {
DataType::Spot => {
let coingecko_id = *ids.get(&pair_id).expect("Failed to get coingecko id");

let api_key = std::env::var("DEFILLAMA_API_KEY");

let request_url = if let Ok(api_key) = api_key {
format!(
"https://coins.llama.fi/prices/historical/{timestamp}/coingecko:{id}?apikey={apikey}",
timestamp = timestamp,
id = coingecko_id,
apikey = api_key
)
} else {
format!(
"https://coins.llama.fi/prices/historical/{timestamp}/coingecko:{id}",
timestamp = timestamp,
id = coingecko_id,
)
};

let response = reqwest::get(&request_url)
.await
.map_err(|e| MonitoringError::Api(e.to_string()))?;

let coins_prices: CoinPricesDTO = response.json().await.map_err(|e| {
MonitoringError::Api(format!(
"Failed to convert to DTO object, got error {:?}",
e.to_string()
))
})?;

let api_id = format!("coingecko:{}", coingecko_id);

let reference_price = coins_prices
.get_coins()
.get(&api_id)
.ok_or(MonitoringError::Api(format!(
"Failed to get coingecko price for id {:?}",
coingecko_id
)))?
.get_price();

let deviation = (reference_price - on_chain_price) / on_chain_price;
let num_sources_aggregated = (*data.get(3).unwrap()).try_into().map_err(|e| {
MonitoringError::Conversion(format!("Failed to convert num sources {:?}", e))
})?;
(deviation, num_sources_aggregated)
}

DataType::Future => {
// TODO: work on a different API for futures

(0.0, 5)
}
};

Ok((deviation, num_sources_aggregated))
}
15 changes: 13 additions & 2 deletions src/monitoring/price_deviation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,30 @@ use crate::{constants::COINGECKO_IDS, error::MonitoringError, types::Entry};
// }
// }
#[derive(serde::Deserialize, Debug)]
struct CoinPricesDTO {
pub struct CoinPricesDTO {
coins: HashMap<String, CoinPriceDTO>,
}

#[allow(unused)]
#[derive(serde::Deserialize, Debug)]
struct CoinPriceDTO {
pub struct CoinPriceDTO {
price: f64,
symbol: String,
timestamp: u64,
confidence: f64,
}

impl CoinPricesDTO {
pub fn get_coins(&self) -> &HashMap<String, CoinPriceDTO> {
&self.coins
}
}
impl CoinPriceDTO {
pub fn get_price(&self) -> f64 {
self.price
}
}

/// Calculates the deviation of the price from a trusted API (DefiLLama)
pub async fn price_deviation<T: Entry>(
query: &T,
Expand Down
24 changes: 19 additions & 5 deletions src/processing/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::config::get_config;
use crate::config::DataType;
use crate::config::NetworkName;
use crate::constants::NUM_SOURCES;
use crate::constants::ON_OFF_PRICE_DEVIATION;
use crate::constants::PAIR_PRICE;
use crate::constants::PRICE_DEVIATION;
use crate::constants::PRICE_DEVIATION_SOURCE;
Expand All @@ -13,7 +14,9 @@ use crate::constants::TIME_SINCE_LAST_UPDATE_PUBLISHER;
use crate::diesel::QueryDsl;
use crate::error::MonitoringError;
use crate::models::FutureEntry;
use crate::monitoring::{price_deviation, source_deviation, time_since_last_update};
use crate::monitoring::{
on_off_price_deviation, price_deviation, source_deviation, time_since_last_update,
};

use crate::schema::future_entry::dsl as testnet_dsl;
use crate::schema::mainnet_future_entry::dsl as mainnet_dsl;
Expand Down Expand Up @@ -62,9 +65,23 @@ pub async fn process_data_by_pair(
let seconds_since_last_publish = time_since_last_update(&data);
let time_labels =
TIME_SINCE_LAST_UPDATE_PAIR_ID.with_label_values(&[network_env, &pair, data_type]);
let num_sources_labels =
NUM_SOURCES.with_label_values(&[network_env, &pair, data_type]);

time_labels.set(seconds_since_last_publish as f64);

let (on_off_deviation, num_sources_aggregated) = on_off_price_deviation(
pair.clone(),
data.timestamp.timestamp() as u64,
DataType::Future,
)
.await?;

ON_OFF_PRICE_DEVIATION
.with_label_values(&[network_env, &pair.clone(), data_type])
.set(on_off_deviation);
num_sources_labels.set(num_sources_aggregated as i64);

Ok(seconds_since_last_publish)
}
Err(e) => Err(e.into()),
Expand Down Expand Up @@ -153,7 +170,6 @@ pub async fn process_data_by_pair_and_source(
PRICE_DEVIATION.with_label_values(&[network_env, pair, src, data_type]);
let source_deviation_labels =
PRICE_DEVIATION_SOURCE.with_label_values(&[network_env, pair, src, data_type]);
let num_sources_labels = NUM_SOURCES.with_label_values(&[network_env, pair, data_type]);

// Compute metrics
let time = time_since_last_update(&data);
Expand All @@ -163,14 +179,12 @@ pub async fn process_data_by_pair_and_source(
let normalized_price = price_as_f64 / (10_u64.pow(decimals)) as f64;

let deviation = price_deviation(&data, normalized_price).await?;
let (source_deviation, num_sources_aggregated) =
source_deviation(&data, normalized_price).await?;
let (source_deviation, _) = source_deviation(&data, normalized_price).await?;

// Set the metrics
price_labels.set(normalized_price);
deviation_labels.set(deviation);
source_deviation_labels.set(source_deviation);
num_sources_labels.set(num_sources_aggregated as i64);

Ok(time)
}
Expand Down
26 changes: 19 additions & 7 deletions src/processing/spot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::config::get_config;
use crate::config::DataType;
use crate::config::NetworkName;
use crate::constants::NUM_SOURCES;
use crate::constants::ON_OFF_PRICE_DEVIATION;
use crate::constants::PAIR_PRICE;
use crate::constants::PRICE_DEVIATION;
use crate::constants::PRICE_DEVIATION_SOURCE;
Expand All @@ -13,7 +14,9 @@ use crate::constants::TIME_SINCE_LAST_UPDATE_PUBLISHER;
use crate::diesel::QueryDsl;
use crate::error::MonitoringError;
use crate::models::SpotEntry;
use crate::monitoring::{price_deviation, source_deviation, time_since_last_update};
use crate::monitoring::{
on_off_price_deviation, price_deviation, source_deviation, time_since_last_update,
};

use crate::schema::mainnet_spot_entry::dsl as mainnet_dsl;
use crate::schema::spot_entry::dsl as testnet_dsl;
Expand Down Expand Up @@ -62,8 +65,21 @@ pub async fn process_data_by_pair(
let seconds_since_last_publish = time_since_last_update(&data);
let time_labels =
TIME_SINCE_LAST_UPDATE_PAIR_ID.with_label_values(&[network_env, &pair, data_type]);

let num_sources_labels =
NUM_SOURCES.with_label_values(&[network_env, &pair, data_type]);

let (on_off_deviation, num_sources_aggregated) = on_off_price_deviation(
pair.clone(),
data.timestamp.timestamp() as u64,
DataType::Spot,
)
.await?;

ON_OFF_PRICE_DEVIATION
.with_label_values(&[network_env, &pair.clone(), data_type])
.set(on_off_deviation);
time_labels.set(seconds_since_last_publish as f64);
num_sources_labels.set(num_sources_aggregated as i64);

Ok(seconds_since_last_publish)
}
Expand All @@ -77,7 +93,6 @@ pub async fn process_data_by_pair_and_sources(
sources: Vec<String>,
) -> Result<u64, MonitoringError> {
let mut timestamps = Vec::new();

let config = get_config(None).await;

let decimals = *config.decimals(DataType::Spot).get(&pair.clone()).unwrap();
Expand Down Expand Up @@ -150,7 +165,6 @@ pub async fn process_data_by_pair_and_source(
PRICE_DEVIATION.with_label_values(&[network_env, pair, src, data_type]);
let source_deviation_labels =
PRICE_DEVIATION_SOURCE.with_label_values(&[network_env, pair, src, data_type]);
let num_sources_labels = NUM_SOURCES.with_label_values(&[network_env, pair, data_type]);

// Compute metrics
let time = time_since_last_update(&data);
Expand All @@ -160,14 +174,12 @@ pub async fn process_data_by_pair_and_source(
let normalized_price = price_as_f64 / (10_u64.pow(decimals)) as f64;

let deviation = price_deviation(&data, normalized_price).await?;
let (source_deviation, num_sources_aggregated) =
source_deviation(&data, normalized_price).await?;
let (source_deviation, _) = source_deviation(&data, normalized_price).await?;

// Set the metrics
price_labels.set(normalized_price);
deviation_labels.set(deviation);
source_deviation_labels.set(source_deviation);
num_sources_labels.set(num_sources_aggregated as i64);

Ok(time)
}
Expand Down

0 comments on commit 90d5e33

Please sign in to comment.