Skip to content

Commit

Permalink
feat: monitoring options
Browse files Browse the repository at this point in the history
  • Loading branch information
JordyRo1 committed Oct 25, 2023
1 parent a38c34a commit 64d4019
Show file tree
Hide file tree
Showing 11 changed files with 181 additions and 48 deletions.
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# The database URL the application will use to connect to the database.
DATABASE_URL=
DATABASE_URL='postgres://postgres:A+T|oiWbuK02OwLG:8rAdF7LT~b-@testnet-aurora.proxy-cusx5kdiz1ia.eu-west-3.rds.amazonaws.com/indexed_data?sslmode=disable'
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/target
/data
.idea
.env
/bin/*
Expand Down
18 changes: 18 additions & 0 deletions prometheus/alertmanager.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
global:
smtp_smarthost: '${SMTP_HOST}'
smtp_from: '${SMTP_FROM}'
smtp_auth_username: '${SMTP_AUTH_USERNAME}'
smtp_auth_password: '${SMTP_AUTH_PASSWORD}'
smtp_require_tls: true

route:
group_by: ['instance', 'severity']
group_wait: 30s
group_interval: 5m
repeat_interval: 4h
receiver: 'email_configs'
receivers:
- name: 'email_configs'
email_configs:
- to: '${EMAIL_TO}'
send_resolved: true
11 changes: 11 additions & 0 deletions prometheus/alerts.rules.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
groups:
- name: example
rules:
- alert: TimeSinceLastUpdateTooHigh
expr: time_since_last_update_seconds > 1200
for: 5m
labels:
severity: critical
annotations:
summary: "Time since the last update is too high"
description: "The time since the last update of {{ $labels.pair }} from {{ $labels.source }} has exceeded 1200 seconds."
9 changes: 8 additions & 1 deletion prometheus/prometheus.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
global:
scrape_interval: 15s
scrape_interval: 5s
evaluation_interval: 15s

scrape_configs:
- job_name: 'prometheus_monitoring'
static_configs:
- targets: ['127.0.0.1:8080']
rule_files:
- "alerts.rules.yml"
alerting:
alertmanagers:
- static_configs:
- targets:
- localhost:9093
29 changes: 29 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use std::{error::Error as StdError, fmt};

#[derive(Debug)]
pub enum MonitoringError {
PriceError(String),
TimeError(String),
DatabaseError(diesel::result::Error),
ConnectionError(String),
}

impl StdError for MonitoringError {}

impl fmt::Display for MonitoringError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
MonitoringError::PriceError(e) => write!(f, "Price Error: {}", e),
MonitoringError::TimeError(e) => write!(f, "Time Error: {}", e),
MonitoringError::DatabaseError(e) => write!(f, "Database Error: {}", e),
MonitoringError::ConnectionError(e) => write!(f, "Connection Error: {}", e),
}
}
}

// Convert diesel error to our custom error
impl From<diesel::result::Error> for MonitoringError {
fn from(err: diesel::result::Error) -> MonitoringError {
MonitoringError::DatabaseError(err)
}
}
56 changes: 12 additions & 44 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,22 @@
extern crate diesel;
extern crate dotenv;

use crate::diesel::ExpressionMethods;
use crate::models::SpotEntry;
use diesel::QueryDsl;
use diesel_async::pooled_connection::deadpool::*;
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
use diesel_async::{AsyncPgConnection, RunQueryDsl};
use diesel_async::AsyncPgConnection;
use dotenv::dotenv;

use schema::spot_entry::dsl::*;
use std::error::Error;
use std::env;
mod error;
mod models;
mod monitoring;
mod process_data;
mod schema;
use prometheus::{opts, register_gauge_vec, GaugeVec};
mod server;

lazy_static::lazy_static! {
static ref TIME_SINCE_LAST_UPDATE: GaugeVec = register_gauge_vec!(
opts!("time_since_last_updatee_seconds", "Time since the last update in seconds."),
&["pair", "source"]
).unwrap();
}

#[tokio::main]
async fn main() {
dotenv().ok();
let pairs = vec![("BTC/USD", "CEX"), ("ETH/USD", "CEX")];
let pairs = vec![("BTC/USD", "CEX", 8), ("ETH/USD", "CEX",8), ("BTC/USD", "COINBASE",8), ("ETH/USD", "COINBASE",8), ("BTC/USD", "BITSTAMP",8), ("ETH/USD", "BITSTAMP",8),("BTC/USD", "OKX",8), ("ETH/USD", "OKX",8),("BTC/USD", "GECKOTERMINAL",8), ("ETH/USD", "GECKOTERMINAL",8), ("BTC/USD", "KAIKO",8), ("ETH/USD", "KAIKO",8),];
tokio::spawn(server::run_metrics_server());
let database_url: String = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let config = AsyncDieselConnectionManager::<diesel_async::AsyncPgConnection>::new(database_url);
Expand All @@ -41,11 +29,14 @@ async fn main() {
let tasks: Vec<_> = pairs
.clone()
.into_iter()
.map(|(pair, srce)| {
let pool_reference: deadpool::managed::Pool<
AsyncDieselConnectionManager<AsyncPgConnection>,
> = pool.clone();
tokio::spawn(Box::pin(process_data(pool_reference, pair, srce)))
.flat_map(|(pair, srce, decimals)| {
let pool_ref_for_pair = pool.clone();
let pool_ref_for_pair_and_source = pool.clone();

vec![
tokio::spawn(Box::pin(process_data::process_data_by_pair(pool_ref_for_pair, pair, decimals))),
tokio::spawn(Box::pin(process_data::process_data_by_pair_and_source(pool_ref_for_pair_and_source, pair, srce, decimals))),
]
})
.collect();

Expand All @@ -64,26 +55,3 @@ async fn main() {
}
}
}

async fn process_data(
pool: deadpool::managed::Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
pair: &str,
srce: &str,
) -> Result<u64, Box<dyn Error + Send>> {
let mut conn = pool.get().await.unwrap();
let result: Result<SpotEntry, _> = spot_entry
.filter(pair_id.eq(pair))
.filter(source.eq(srce))
.order(block_timestamp.desc())
.first(&mut conn)
.await;
match result {
Ok(data) => {
let time = monitoring::timeLastUpdate::time_since_last_update(data).await;
let labels = TIME_SINCE_LAST_UPDATE.with_label_values(&[pair, srce]);
labels.set(time as f64);
Ok(time)
}
Err(e) => Err(Box::new(e)),
}
}
3 changes: 2 additions & 1 deletion src/monitoring/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod timeLastUpdate;
pub mod timeLastUpdateSource;
pub mod timeLastUpdatePairId;
9 changes: 9 additions & 0 deletions src/monitoring/timeLastUpdatePairId.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use crate::models::SpotEntry;
use chrono::{DateTime, TimeZone, Utc};
use std::time::SystemTime;
pub async fn time_last_update_pair_id(query: &SpotEntry) -> u64 {
let datetime: DateTime<Utc> = TimeZone::from_utc_datetime(&Utc, &query.timestamp.unwrap());
let timestamp = datetime.timestamp();
let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH);
now.unwrap().as_secs() - timestamp as u64
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::models::SpotEntry;
use chrono::{DateTime, TimeZone, Utc};
use std::time::SystemTime;
pub async fn time_since_last_update(query: SpotEntry) -> u64 {
pub async fn time_since_last_update(query: &SpotEntry) -> u64 {
let datetime: DateTime<Utc> = TimeZone::from_utc_datetime(&Utc, &query.timestamp.unwrap());
let timestamp = datetime.timestamp();
let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH);
Expand Down
89 changes: 89 additions & 0 deletions src/process_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
extern crate diesel;
extern crate dotenv;
use crate::models::SpotEntry;
use bigdecimal::ToPrimitive;
use diesel::ExpressionMethods;
use diesel_async::AsyncPgConnection;
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
use crate::schema::spot_entry::dsl::*;
use crate::monitoring::timeLastUpdateSource::time_since_last_update;
use crate::monitoring::timeLastUpdatePairId::time_last_update_pair_id;
use diesel::sql_types::Text;
use prometheus::{opts, register_gauge_vec, GaugeVec};
use crate::diesel::QueryDsl;
use diesel_async::RunQueryDsl;
use crate::error::MonitoringError;
lazy_static::lazy_static! {
static ref TIME_SINCE_LAST_UPDATE_SOURCE: GaugeVec = register_gauge_vec!(
opts!("time_since_last_updatee_seconds", "Time since the last update in seconds."),
&["pair", "source"]
).unwrap();
}

lazy_static::lazy_static! {
static ref PAIR_PRICE: GaugeVec = register_gauge_vec!(
opts!("pair_price", "Price of the pair from the source."),
&["pair", "source"]
).unwrap();
}

lazy_static::lazy_static! {
static ref TIME_SINCE_LAST_UPDATE_PAIR_ID: GaugeVec = register_gauge_vec!(
opts!("time_since_last_update_pair_id", "Time since the last update in seconds."),
&["pair"]
).unwrap();
}

pub async fn process_data_by_pair(
pool: deadpool::managed::Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
pair: &str,
decimals: u32,
) -> Result<u64, MonitoringError> {
let mut conn = pool.get().await.map_err(|_| MonitoringError::ConnectionError("Failed to get connection".to_string()))?;
let result: Result<SpotEntry, _> = spot_entry
.filter(pair_id.eq(pair))
.order(block_timestamp.desc())
.first(&mut conn)
.await;
match result {
Ok(data) => {
let minute_since_last_publish = time_last_update_pair_id(&data).await;
let time_labels = TIME_SINCE_LAST_UPDATE_PAIR_ID.with_label_values(&[pair]);
time_labels.set(minute_since_last_publish as f64);
Ok(minute_since_last_publish)
},
Err(e) => Err(e.into())
}
}


pub async fn process_data_by_pair_and_source(
pool: deadpool::managed::Pool<AsyncDieselConnectionManager<AsyncPgConnection>>,
pair: &str,
srce: &str,
decimals: u32
) -> Result<u64, MonitoringError> {
let mut conn = pool.get().await.map_err(|_| MonitoringError::ConnectionError("Failed to get connection".to_string()))?;

let filtered_by_source_result: Result<SpotEntry, _> = spot_entry
.filter(pair_id.eq(pair))
.filter(source.eq(srce))
.order(block_timestamp.desc())
.first(&mut conn)
.await;


match filtered_by_source_result {
Ok(data) => {
let time = time_since_last_update(&data).await;
let time_labels = TIME_SINCE_LAST_UPDATE_SOURCE.with_label_values(&[pair, srce]);
let price_labels = PAIR_PRICE.with_label_values(&[&pair, &srce]);
let price_as_f64 = data.price.to_f64().ok_or(MonitoringError::PriceError("Failed to convert BigDecimal to f64".to_string()))?;
let dec : i32= 10;
price_labels.set(price_as_f64/(dec.pow(decimals)) as f64);
time_labels.set(time as f64);
Ok(time)
},
Err(e) => Err(e.into())
}
}

0 comments on commit 64d4019

Please sign in to comment.