From 52c37f25dd7c810d289b9fc31933982fc6316028 Mon Sep 17 00:00:00 2001 From: Philipp Hoenisch Date: Tue, 28 May 2024 14:46:20 +1000 Subject: [PATCH 1/2] chore: remove prometheus metrics endpoint These weren't used but exposed so we better remove them. --- coordinator/src/bin/coordinator.rs | 19 --- coordinator/src/lib.rs | 1 - coordinator/src/metrics.rs | 217 ----------------------------- coordinator/src/routes.rs | 32 ----- 4 files changed, 269 deletions(-) delete mode 100644 coordinator/src/metrics.rs diff --git a/coordinator/src/bin/coordinator.rs b/coordinator/src/bin/coordinator.rs index d9ab1344c..42c550b9d 100644 --- a/coordinator/src/bin/coordinator.rs +++ b/coordinator/src/bin/coordinator.rs @@ -8,8 +8,6 @@ use coordinator::dlc_handler::DlcHandler; use coordinator::logger; use coordinator::message::spawn_delivering_messages_to_authenticated_users; use coordinator::message::NewUserMessage; -use coordinator::metrics; -use coordinator::metrics::init_meter; use coordinator::node::expired_positions; use coordinator::node::liquidated_positions; use coordinator::node::rollover; @@ -47,7 +45,6 @@ use xxi_node::node::event::NodeEventHandler; use xxi_node::seed::Bip39Seed; use xxi_node::storage::DlcChannelEvent; -const PROCESS_PROMETHEUS_METRICS: Duration = Duration::from_secs(10); const PROCESS_INCOMING_DLC_MESSAGES_INTERVAL: Duration = Duration::from_millis(200); const LIQUIDATED_POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(30); const EXPIRED_POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(5 * 60); @@ -76,8 +73,6 @@ async fn main() -> Result<()> { }), ); - let exporter = init_meter(); - let opts = Opts::read(); let data_dir = opts.data_dir()?; let address = opts.p2p_address; @@ -232,19 +227,6 @@ async fn main() -> Result<()> { } }); - tokio::spawn({ - let node = node.clone(); - async move { - loop { - let node = node.clone(); - spawn_blocking(move || metrics::collect(node)) - .await - .expect("To spawn blocking thread"); - tokio::time::sleep(PROCESS_PROMETHEUS_METRICS).await; - } - } - }); - tokio::spawn({ let node = node.clone(); async move { @@ -323,7 +305,6 @@ async fn main() -> Result<()> { node.clone(), pool.clone(), settings.clone(), - exporter, NODE_ALIAS, trading_sender, tx_orderbook_feed, diff --git a/coordinator/src/lib.rs b/coordinator/src/lib.rs index 75908d688..c0da46cc0 100644 --- a/coordinator/src/lib.rs +++ b/coordinator/src/lib.rs @@ -32,7 +32,6 @@ pub mod dlc_handler; pub mod dlc_protocol; pub mod logger; pub mod message; -pub mod metrics; pub mod node; pub mod notifications; pub mod orderbook; diff --git a/coordinator/src/metrics.rs b/coordinator/src/metrics.rs deleted file mode 100644 index 779252517..000000000 --- a/coordinator/src/metrics.rs +++ /dev/null @@ -1,217 +0,0 @@ -use crate::db; -use crate::node::storage::NodeStorage; -use crate::node::Node; -use crate::storage::CoordinatorTenTenOneStorage; -use lazy_static::lazy_static; -use opentelemetry::global; -use opentelemetry::metrics::Meter; -use opentelemetry::metrics::ObservableGauge; -use opentelemetry::sdk::export::metrics::aggregation; -use opentelemetry::sdk::metrics::controllers; -use opentelemetry::sdk::metrics::processors; -use opentelemetry::sdk::metrics::selectors; -use opentelemetry::Context; -use opentelemetry::KeyValue; -use opentelemetry_prometheus::PrometheusExporter; -use std::sync::Arc; -use std::time::Duration; -use xxi_node::commons::ContractSymbol; -use xxi_node::commons::Direction; - -lazy_static! { - pub static ref METER: Meter = global::meter("maker"); - - // channel details metrics - pub static ref CHANNEL_BALANCE_SATOSHI: ObservableGauge = METER - .u64_observable_gauge("channel_balance_satoshi") - .with_description("Current channel balance in satoshi") - .init(); - pub static ref CHANNEL_OUTBOUND_CAPACITY_SATOSHI: ObservableGauge = METER - .u64_observable_gauge("channel_outbound_capacity_satoshi") - .with_description("Channel outbound capacity in satoshi") - .init(); - pub static ref CHANNEL_INBOUND_CAPACITY_SATOSHI: ObservableGauge = METER - .u64_observable_gauge("channel_inbound_capacity_satoshi") - .with_description("Channel inbound capacity in satoshi") - .init(); - pub static ref CHANNEL_IS_USABLE: ObservableGauge = METER - .u64_observable_gauge("channel_is_usable") - .with_description("If a channel is usable") - .init(); - pub static ref DLC_CHANNELS_AMOUNT: ObservableGauge = METER - .u64_observable_gauge("dlc_channel_amount") - .with_description("Number of DLC channels") - .init(); - pub static ref PUNISHED_DLC_CHANNELS_AMOUNT: ObservableGauge = METER - .u64_observable_gauge("punished_dlc_channel_amount") - .with_description("Number of punished DLC channels") - .init(); - - // general node metrics - pub static ref CONNECTED_PEERS: ObservableGauge = METER - .u64_observable_gauge("node_connected_peers_total") - .with_description("Total number of connected peers") - .init(); - pub static ref NODE_BALANCE_SATOSHI: ObservableGauge = METER - .u64_observable_gauge("node_balance_satoshi") - .with_description("Node balance in satoshi") - .init(); - - // position metrics - pub static ref POSITION_QUANTITY: ObservableGauge = METER - .f64_observable_gauge("position_quantity_contracts") - .with_description("Current open position in contracts") - .init(); - pub static ref POSITION_MARGIN: ObservableGauge = METER - .i64_observable_gauge("position_margin_sats") - .with_description("Current open position margin in sats") - .init(); -} - -pub fn init_meter() -> PrometheusExporter { - let controller = controllers::basic(processors::factory( - selectors::simple::histogram([1.0, 2.0, 5.0, 10.0, 20.0, 50.0]), - aggregation::cumulative_temporality_selector(), - )) - .with_collect_period(Duration::from_secs(10)) - .build(); - - opentelemetry_prometheus::exporter(controller).init() -} - -pub fn collect(node: Node) { - let cx = opentelemetry::Context::current(); - position_metrics(&cx, &node); - - let inner_node = node.inner; - - node_metrics(&cx, inner_node); -} - -fn position_metrics(cx: &Context, node: &Node) { - let mut conn = match node.pool.get() { - Ok(conn) => conn, - Err(e) => { - tracing::error!("Failed to get pool connection. Error: {e:?}"); - return; - } - }; - - let positions = match db::positions::Position::get_all_open_positions(&mut conn) { - Ok(positions) => positions, - Err(e) => { - tracing::error!("Failed to get positions. Error: {e:?}"); - return; - } - }; - - let mut margin_long = 0; - let mut margin_short = 0; - let mut quantity_long = 0.0; - let mut quantity_short = 0.0; - - // Note: we should filter positions here by BTCUSD once we have multiple contract symbols - - for position in positions { - debug_assert!( - position.contract_symbol == ContractSymbol::BtcUsd, - "We should filter positions here by BTCUSD once we have multiple contract symbols" - ); - match position.trader_direction { - Direction::Long => { - // TODO: fix me: this was meant to be the traders margin - margin_long += position.coordinator_margin; - quantity_long += position.quantity; - } - Direction::Short => { - margin_short += position.coordinator_margin; - quantity_short += position.quantity; - } - } - } - POSITION_QUANTITY.observe( - cx, - quantity_long as f64, - &[ - KeyValue::new("symbol", "BTCUSD"), - KeyValue::new("status", "open"), - KeyValue::new("direction", "long"), - ], - ); - POSITION_QUANTITY.observe( - cx, - quantity_short as f64, - &[ - KeyValue::new("symbol", "BTCUSD"), - KeyValue::new("status", "open"), - KeyValue::new("direction", "short"), - ], - ); - POSITION_MARGIN.observe( - cx, - margin_long, - &[ - KeyValue::new("symbol", "BTCUSD"), - KeyValue::new("status", "open"), - KeyValue::new("direction", "long"), - ], - ); - POSITION_MARGIN.observe( - cx, - margin_short, - &[ - KeyValue::new("symbol", "BTCUSD"), - KeyValue::new("status", "open"), - KeyValue::new("direction", "short"), - ], - ); -} - -fn node_metrics( - cx: &Context, - inner_node: Arc< - xxi_node::node::Node< - bdk_file_store::Store, - CoordinatorTenTenOneStorage, - NodeStorage, - >, - >, -) { - let connected_peers = inner_node.list_peers().len(); - CONNECTED_PEERS.observe(cx, connected_peers as u64, &[]); - - let balance = inner_node.get_on_chain_balance(); - - NODE_BALANCE_SATOSHI.observe( - cx, - balance.confirmed, - &[ - KeyValue::new("type", "on-chain"), - KeyValue::new("status", "confirmed"), - ], - ); - NODE_BALANCE_SATOSHI.observe( - cx, - balance.immature, - &[ - KeyValue::new("type", "on-chain"), - KeyValue::new("status", "immature"), - ], - ); - NODE_BALANCE_SATOSHI.observe( - cx, - balance.trusted_pending, - &[ - KeyValue::new("type", "on-chain"), - KeyValue::new("status", "trusted_pending"), - ], - ); - NODE_BALANCE_SATOSHI.observe( - cx, - balance.untrusted_pending, - &[ - KeyValue::new("type", "on-chain"), - KeyValue::new("status", "untrusted_pending"), - ], - ); -} diff --git a/coordinator/src/routes.rs b/coordinator/src/routes.rs index 9de5c0c5b..3123e5ac7 100644 --- a/coordinator/src/routes.rs +++ b/coordinator/src/routes.rs @@ -45,7 +45,6 @@ use axum::extract::Path; use axum::extract::Query; use axum::extract::State; use axum::extract::WebSocketUpgrade; -use axum::http::StatusCode; use axum::response::IntoResponse; use axum::routing::delete; use axum::routing::get; @@ -63,14 +62,11 @@ use diesel::r2d2::Pool; use diesel::PgConnection; use lnd_bridge::InvoiceParams; use lnd_bridge::LndBridge; -use opentelemetry_prometheus::PrometheusExporter; use orderbook::delete_order; use orderbook::get_order; use orderbook::get_orders; use orderbook::post_order; use orderbook::websocket_handler; -use prometheus::Encoder; -use prometheus::TextEncoder; use serde::Serialize; use std::net::SocketAddr; use std::str::FromStr; @@ -110,7 +106,6 @@ pub struct AppState { pub trading_sender: mpsc::Sender, pub pool: Pool>, pub settings: RwLock, - pub exporter: PrometheusExporter, pub node_alias: String, pub auth_users_notifier: mpsc::Sender, pub notification_sender: mpsc::Sender, @@ -124,7 +119,6 @@ pub fn router( node: Node, pool: Pool>, settings: Settings, - exporter: PrometheusExporter, node_alias: &str, trading_sender: mpsc::Sender, tx_orderbook_feed: broadcast::Sender, @@ -145,7 +139,6 @@ pub fn router( tx_position_feed, tx_user_feed, trading_sender, - exporter, node_alias: node_alias.to_string(), auth_users_notifier, notification_sender, @@ -219,7 +212,6 @@ pub fn router( "/api/admin/users/:trader_pubkey/referrals", get(get_user_referral_status), ) - .route("/metrics", get(get_metrics)) .route("/health", get(get_health)) .route("/api/leaderboard", get(get_leaderboard)) .route( @@ -359,30 +351,6 @@ pub async fn get_user( } } -pub async fn get_metrics(State(state): State>) -> impl IntoResponse { - let exporter = state.exporter.clone(); - let encoder = TextEncoder::new(); - let metric_families = exporter.registry().gather(); - let mut result = vec![]; - match encoder.encode(&metric_families, &mut result) { - Ok(()) => (), - Err(err) => { - tracing::error!("Could not collect opentelemetry metrics {err:#}"); - return (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", err)); - } - }; - - let open_telemetry_metrics = match String::from_utf8(result) { - Ok(s) => s, - Err(err) => { - tracing::error!("Could not format metrics as string {err:#}"); - return (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", err)); - } - }; - - (StatusCode::OK, open_telemetry_metrics) -} - pub async fn get_health() -> Result, AppError> { // TODO: Implement any health check logic we'd need // So far this just returns if the server is running From ce43bddd22e428e50363b16ce9ed8ad752cb9a7b Mon Sep 17 00:00:00 2001 From: Philipp Hoenisch Date: Tue, 28 May 2024 15:24:54 +1000 Subject: [PATCH 2/2] feat(coordinator): collect metrics Creats a new table called `metrics` which is meant to be used to store business metrics which are not collected otherwise, such as, - on-chain balance - ln-balance --- .../prod-coordinator-settings.toml | 1 + .../test-coordinator-settings.toml | 1 + .../down.sql | 1 + .../up.sql | 6 +++ coordinator/src/bin/coordinator.rs | 5 ++ coordinator/src/db/metrics.rs | 16 ++++++ coordinator/src/db/mod.rs | 1 + coordinator/src/lib.rs | 1 + coordinator/src/metrics.rs | 20 ++++++++ coordinator/src/scheduler.rs | 50 +++++++++++++++++++ coordinator/src/schema.rs | 9 ++++ coordinator/src/settings.rs | 13 +++++ 12 files changed, 124 insertions(+) create mode 100644 coordinator/migrations/2024-05-28-045644_add-metrics-table/down.sql create mode 100644 coordinator/migrations/2024-05-28-045644_add-metrics-table/up.sql create mode 100644 coordinator/src/db/metrics.rs create mode 100644 coordinator/src/metrics.rs diff --git a/coordinator/example-settings/prod-coordinator-settings.toml b/coordinator/example-settings/prod-coordinator-settings.toml index acd11558a..f276f9482 100644 --- a/coordinator/example-settings/prod-coordinator-settings.toml +++ b/coordinator/example-settings/prod-coordinator-settings.toml @@ -4,6 +4,7 @@ rollover_window_close_scheduler = "0 5 13 * * 5,6" close_expired_position_scheduler = "0 0 12 * * *" close_liquidated_position_scheduler = "0 0 12 * * *" update_user_bonus_status_scheduler = "0 0 0 * * *" +collect_metrics_scheduler = "0 0 * * * *" whitelist_enabled = false whitelisted_makers = [] min_quantity = 1 diff --git a/coordinator/example-settings/test-coordinator-settings.toml b/coordinator/example-settings/test-coordinator-settings.toml index 940dfc095..9c2025ac8 100644 --- a/coordinator/example-settings/test-coordinator-settings.toml +++ b/coordinator/example-settings/test-coordinator-settings.toml @@ -4,6 +4,7 @@ rollover_window_close_scheduler = "0 5 22 * * *" close_expired_position_scheduler = "0 0 12 * * *" close_liquidated_position_scheduler = "0 0 12 * * *" update_user_bonus_status_scheduler = "0 0 0 * * *" +collect_metrics_scheduler = "0 0 * * * *" whitelist_enabled = false # Default testnet maker whitelisted_makers = ["035eccdd1f05c65b433cf38e3b2597e33715e0392cb14d183e812f1319eb7b6794"] diff --git a/coordinator/migrations/2024-05-28-045644_add-metrics-table/down.sql b/coordinator/migrations/2024-05-28-045644_add-metrics-table/down.sql new file mode 100644 index 000000000..4c57c6bbb --- /dev/null +++ b/coordinator/migrations/2024-05-28-045644_add-metrics-table/down.sql @@ -0,0 +1 @@ +drop table if exists metrics; diff --git a/coordinator/migrations/2024-05-28-045644_add-metrics-table/up.sql b/coordinator/migrations/2024-05-28-045644_add-metrics-table/up.sql new file mode 100644 index 000000000..c5abca94e --- /dev/null +++ b/coordinator/migrations/2024-05-28-045644_add-metrics-table/up.sql @@ -0,0 +1,6 @@ +create table if not exists metrics +( + id SERIAL PRIMARY KEY NOT NULL, + created_at timestamp WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, + on_chain_balance_sats BIGINT NOT NULL +); diff --git a/coordinator/src/bin/coordinator.rs b/coordinator/src/bin/coordinator.rs index 42c550b9d..e6f4cf210 100644 --- a/coordinator/src/bin/coordinator.rs +++ b/coordinator/src/bin/coordinator.rs @@ -343,6 +343,11 @@ async fn main() -> Result<()> { .await .expect("To add the close liquidated position reminder job"); + scheduler + .add_collect_metrics_job(pool.clone()) + .await + .expect("To add the collect metrics job"); + scheduler .start() .await diff --git a/coordinator/src/db/metrics.rs b/coordinator/src/db/metrics.rs new file mode 100644 index 000000000..0813ca77a --- /dev/null +++ b/coordinator/src/db/metrics.rs @@ -0,0 +1,16 @@ +use crate::schema::metrics; +use anyhow::ensure; +use anyhow::Result; +use diesel::ExpressionMethods; +use diesel::PgConnection; +use diesel::RunQueryDsl; + +pub fn create_metrics_entry(conn: &mut PgConnection, on_chain_balance: u64) -> Result<()> { + let affected_rows = diesel::insert_into(metrics::table) + .values(metrics::on_chain_balance_sats.eq(on_chain_balance as i64)) + .execute(conn)?; + + ensure!(affected_rows > 0, "Could not insert metric entry"); + + Ok(()) +} diff --git a/coordinator/src/db/mod.rs b/coordinator/src/db/mod.rs index f6c706e63..3bce9e316 100644 --- a/coordinator/src/db/mod.rs +++ b/coordinator/src/db/mod.rs @@ -9,6 +9,7 @@ pub mod dlc_protocols; pub mod hodl_invoice; pub mod last_outbound_dlc_message; pub mod liquidity_options; +pub mod metrics; pub mod polls; pub mod positions; pub mod reported_errors; diff --git a/coordinator/src/lib.rs b/coordinator/src/lib.rs index c0da46cc0..106f2f969 100644 --- a/coordinator/src/lib.rs +++ b/coordinator/src/lib.rs @@ -32,6 +32,7 @@ pub mod dlc_handler; pub mod dlc_protocol; pub mod logger; pub mod message; +mod metrics; pub mod node; pub mod notifications; pub mod orderbook; diff --git a/coordinator/src/metrics.rs b/coordinator/src/metrics.rs new file mode 100644 index 000000000..0f2ccc47c --- /dev/null +++ b/coordinator/src/metrics.rs @@ -0,0 +1,20 @@ +use crate::db; +use crate::node::Node; +use anyhow::Result; +use diesel::r2d2::ConnectionManager; +use diesel::r2d2::PooledConnection; +use diesel::PgConnection; + +pub fn collect_metrics( + mut conn: PooledConnection>, + node: Node, +) -> Result<()> { + let balance = node.inner.wallet().get_balance(); + db::metrics::create_metrics_entry( + &mut conn, + balance.confirmed + balance.untrusted_pending + balance.trusted_pending + balance.immature, + )?; + // TODO: also collect LN balance + + Ok(()) +} diff --git a/coordinator/src/scheduler.rs b/coordinator/src/scheduler.rs index 4ae855819..ffb5540e1 100644 --- a/coordinator/src/scheduler.rs +++ b/coordinator/src/scheduler.rs @@ -1,4 +1,5 @@ use crate::db; +use crate::metrics::collect_metrics; use crate::node::Node; use crate::notifications::Notification; use crate::notifications::NotificationKind; @@ -62,6 +63,27 @@ impl NotificationScheduler { Ok(()) } + pub async fn add_collect_metrics_job( + &self, + pool: Pool>, + ) -> Result<()> { + let schedule = self.settings.collect_metrics_scheduler.clone(); + + let uuid = self + .scheduler + .add(build_metrics_collector_job( + schedule.as_str(), + pool, + self.node.clone(), + )?) + .await?; + tracing::debug!( + job_id = uuid.to_string(), + "Started new job to collect metrics" + ); + Ok(()) + } + pub async fn add_reminder_to_close_expired_position_job( &self, pool: Pool>, @@ -366,3 +388,31 @@ fn build_remind_to_close_liquidated_position_notification_job( } }) } + +fn build_metrics_collector_job( + schedule: &str, + pool: Pool>, + node: Node, +) -> Result { + Job::new_async(schedule, move |_, _| { + let conn = match pool.get() { + Ok(conn) => conn, + Err(e) => { + return Box::pin(async move { + tracing::error!("Failed to get connection. Error: {e:#}") + }); + } + }; + let node = node.clone(); + Box::pin({ + async move { + match collect_metrics(conn, node) { + Ok(_) => {} + Err(error) => { + tracing::error!("Failed collecting metrics {error:#}"); + } + } + } + }) + }) +} diff --git a/coordinator/src/schema.rs b/coordinator/src/schema.rs index 333790ff9..08edc00f9 100644 --- a/coordinator/src/schema.rs +++ b/coordinator/src/schema.rs @@ -295,6 +295,14 @@ diesel::table! { } } +diesel::table! { + metrics (id) { + id -> Int4, + created_at -> Timestamptz, + on_chain_balance_sats -> Int8, + } +} + diesel::table! { use diesel::sql_types::*; use super::sql_types::DirectionType; @@ -513,6 +521,7 @@ diesel::allow_tables_to_appear_in_same_query!( liquidity_options, liquidity_request_logs, matches, + metrics, orders, payments, polls, diff --git a/coordinator/src/settings.rs b/coordinator/src/settings.rs index 98483a836..d5dcf76de 100644 --- a/coordinator/src/settings.rs +++ b/coordinator/src/settings.rs @@ -64,6 +64,15 @@ pub struct Settings { /// * * * * * * * pub update_user_bonus_status_scheduler: String, + // We don't want the doc block below to be auto-formatted. + #[rustfmt::skip] + /// A cron syntax for collecting metrics + /// + /// The format is : + /// sec min hour day of month month day of week year + /// * * * * * * * + pub collect_metrics_scheduler: String, + // Location of the settings file in the file system. path: PathBuf, @@ -134,6 +143,7 @@ impl Settings { close_expired_position_scheduler: file.close_expired_position_scheduler, close_liquidated_position_scheduler: file.close_liquidated_position_scheduler, update_user_bonus_status_scheduler: file.update_user_bonus_status_scheduler, + collect_metrics_scheduler: file.collect_metrics_scheduler, path, whitelist_enabled: file.whitelist_enabled, whitelisted_makers: file.whitelisted_makers, @@ -156,6 +166,7 @@ pub struct SettingsFile { close_expired_position_scheduler: String, close_liquidated_position_scheduler: String, update_user_bonus_status_scheduler: String, + collect_metrics_scheduler: String, whitelist_enabled: bool, whitelisted_makers: Vec, @@ -175,6 +186,7 @@ impl From for SettingsFile { close_expired_position_scheduler: value.close_expired_position_scheduler, close_liquidated_position_scheduler: value.close_liquidated_position_scheduler, update_user_bonus_status_scheduler: value.update_user_bonus_status_scheduler, + collect_metrics_scheduler: value.collect_metrics_scheduler, whitelist_enabled: false, whitelisted_makers: value.whitelisted_makers, min_quantity: value.min_quantity, @@ -205,6 +217,7 @@ mod tests { close_expired_position_scheduler: "baz".to_string(), close_liquidated_position_scheduler: "baz".to_string(), update_user_bonus_status_scheduler: "bazinga".to_string(), + collect_metrics_scheduler: "42".to_string(), whitelist_enabled: false, whitelisted_makers: vec![PublicKey::from_str( "0218845781f631c48f1c9709e23092067d06837f30aa0cd0544ac887fe91ddd166",