Skip to content

Commit

Permalink
Merge pull request #2576 from get10101/feat/stats
Browse files Browse the repository at this point in the history
feat: store some metrics in db
  • Loading branch information
bonomat authored May 31, 2024
2 parents 0c4b220 + ce43bdd commit a544aa7
Show file tree
Hide file tree
Showing 13 changed files with 121 additions and 266 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ rollover_window_close_scheduler = "0 5 13 * * 5,6"
close_expired_position_scheduler = "0 0 12 * * *"
close_liquidated_position_scheduler = "0 0 12 * * *"
update_user_bonus_status_scheduler = "0 0 0 * * *"
collect_metrics_scheduler = "0 0 * * * *"
whitelist_enabled = false
whitelisted_makers = []
min_quantity = 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ rollover_window_close_scheduler = "0 5 22 * * *"
close_expired_position_scheduler = "0 0 12 * * *"
close_liquidated_position_scheduler = "0 0 12 * * *"
update_user_bonus_status_scheduler = "0 0 0 * * *"
collect_metrics_scheduler = "0 0 * * * *"
whitelist_enabled = false
# Default testnet maker
whitelisted_makers = ["035eccdd1f05c65b433cf38e3b2597e33715e0392cb14d183e812f1319eb7b6794"]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
drop table if exists metrics;
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
create table if not exists metrics
(
id SERIAL PRIMARY KEY NOT NULL,
created_at timestamp WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
on_chain_balance_sats BIGINT NOT NULL
);
24 changes: 5 additions & 19 deletions coordinator/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ use coordinator::dlc_handler::DlcHandler;
use coordinator::logger;
use coordinator::message::spawn_delivering_messages_to_authenticated_users;
use coordinator::message::NewUserMessage;
use coordinator::metrics;
use coordinator::metrics::init_meter;
use coordinator::node::expired_positions;
use coordinator::node::liquidated_positions;
use coordinator::node::rollover;
Expand Down Expand Up @@ -47,7 +45,6 @@ use xxi_node::node::event::NodeEventHandler;
use xxi_node::seed::Bip39Seed;
use xxi_node::storage::DlcChannelEvent;

const PROCESS_PROMETHEUS_METRICS: Duration = Duration::from_secs(10);
const PROCESS_INCOMING_DLC_MESSAGES_INTERVAL: Duration = Duration::from_millis(200);
const LIQUIDATED_POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(30);
const EXPIRED_POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(5 * 60);
Expand Down Expand Up @@ -76,8 +73,6 @@ async fn main() -> Result<()> {
}),
);

let exporter = init_meter();

let opts = Opts::read();
let data_dir = opts.data_dir()?;
let address = opts.p2p_address;
Expand Down Expand Up @@ -233,19 +228,6 @@ async fn main() -> Result<()> {
}
});

tokio::spawn({
let node = node.clone();
async move {
loop {
let node = node.clone();
spawn_blocking(move || metrics::collect(node))
.await
.expect("To spawn blocking thread");
tokio::time::sleep(PROCESS_PROMETHEUS_METRICS).await;
}
}
});

tokio::spawn({
let node = node.clone();
async move {
Expand Down Expand Up @@ -324,7 +306,6 @@ async fn main() -> Result<()> {
node.clone(),
pool.clone(),
settings.clone(),
exporter,
NODE_ALIAS,
trading_sender,
tx_orderbook_feed,
Expand Down Expand Up @@ -363,6 +344,11 @@ async fn main() -> Result<()> {
.await
.expect("To add the close liquidated position reminder job");

scheduler
.add_collect_metrics_job(pool.clone())
.await
.expect("To add the collect metrics job");

scheduler
.start()
.await
Expand Down
16 changes: 16 additions & 0 deletions coordinator/src/db/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use crate::schema::metrics;
use anyhow::ensure;
use anyhow::Result;
use diesel::ExpressionMethods;
use diesel::PgConnection;
use diesel::RunQueryDsl;

pub fn create_metrics_entry(conn: &mut PgConnection, on_chain_balance: u64) -> Result<()> {
let affected_rows = diesel::insert_into(metrics::table)
.values(metrics::on_chain_balance_sats.eq(on_chain_balance as i64))
.execute(conn)?;

ensure!(affected_rows > 0, "Could not insert metric entry");

Ok(())
}
1 change: 1 addition & 0 deletions coordinator/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod dlc_protocols;
pub mod hodl_invoice;
pub mod last_outbound_dlc_message;
pub mod liquidity_options;
pub mod metrics;
pub mod polls;
pub mod positions;
pub mod reported_errors;
Expand Down
2 changes: 1 addition & 1 deletion coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub mod dlc_handler;
pub mod dlc_protocol;
pub mod logger;
pub mod message;
pub mod metrics;
mod metrics;
pub mod node;
pub mod notifications;
pub mod orderbook;
Expand Down
231 changes: 17 additions & 214 deletions coordinator/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,217 +1,20 @@
use crate::db;
use crate::node::storage::NodeStorage;
use crate::node::Node;
use crate::storage::CoordinatorTenTenOneStorage;
use lazy_static::lazy_static;
use opentelemetry::global;
use opentelemetry::metrics::Meter;
use opentelemetry::metrics::ObservableGauge;
use opentelemetry::sdk::export::metrics::aggregation;
use opentelemetry::sdk::metrics::controllers;
use opentelemetry::sdk::metrics::processors;
use opentelemetry::sdk::metrics::selectors;
use opentelemetry::Context;
use opentelemetry::KeyValue;
use opentelemetry_prometheus::PrometheusExporter;
use std::sync::Arc;
use std::time::Duration;
use xxi_node::commons::ContractSymbol;
use xxi_node::commons::Direction;

lazy_static! {
pub static ref METER: Meter = global::meter("maker");

// channel details metrics
pub static ref CHANNEL_BALANCE_SATOSHI: ObservableGauge<u64> = METER
.u64_observable_gauge("channel_balance_satoshi")
.with_description("Current channel balance in satoshi")
.init();
pub static ref CHANNEL_OUTBOUND_CAPACITY_SATOSHI: ObservableGauge<u64> = METER
.u64_observable_gauge("channel_outbound_capacity_satoshi")
.with_description("Channel outbound capacity in satoshi")
.init();
pub static ref CHANNEL_INBOUND_CAPACITY_SATOSHI: ObservableGauge<u64> = METER
.u64_observable_gauge("channel_inbound_capacity_satoshi")
.with_description("Channel inbound capacity in satoshi")
.init();
pub static ref CHANNEL_IS_USABLE: ObservableGauge<u64> = METER
.u64_observable_gauge("channel_is_usable")
.with_description("If a channel is usable")
.init();
pub static ref DLC_CHANNELS_AMOUNT: ObservableGauge<u64> = METER
.u64_observable_gauge("dlc_channel_amount")
.with_description("Number of DLC channels")
.init();
pub static ref PUNISHED_DLC_CHANNELS_AMOUNT: ObservableGauge<u64> = METER
.u64_observable_gauge("punished_dlc_channel_amount")
.with_description("Number of punished DLC channels")
.init();

// general node metrics
pub static ref CONNECTED_PEERS: ObservableGauge<u64> = METER
.u64_observable_gauge("node_connected_peers_total")
.with_description("Total number of connected peers")
.init();
pub static ref NODE_BALANCE_SATOSHI: ObservableGauge<u64> = METER
.u64_observable_gauge("node_balance_satoshi")
.with_description("Node balance in satoshi")
.init();

// position metrics
pub static ref POSITION_QUANTITY: ObservableGauge<f64> = METER
.f64_observable_gauge("position_quantity_contracts")
.with_description("Current open position in contracts")
.init();
pub static ref POSITION_MARGIN: ObservableGauge<i64> = METER
.i64_observable_gauge("position_margin_sats")
.with_description("Current open position margin in sats")
.init();
}

pub fn init_meter() -> PrometheusExporter {
let controller = controllers::basic(processors::factory(
selectors::simple::histogram([1.0, 2.0, 5.0, 10.0, 20.0, 50.0]),
aggregation::cumulative_temporality_selector(),
))
.with_collect_period(Duration::from_secs(10))
.build();

opentelemetry_prometheus::exporter(controller).init()
}

pub fn collect(node: Node) {
let cx = opentelemetry::Context::current();
position_metrics(&cx, &node);

let inner_node = node.inner;

node_metrics(&cx, inner_node);
}

fn position_metrics(cx: &Context, node: &Node) {
let mut conn = match node.pool.get() {
Ok(conn) => conn,
Err(e) => {
tracing::error!("Failed to get pool connection. Error: {e:?}");
return;
}
};

let positions = match db::positions::Position::get_all_open_positions(&mut conn) {
Ok(positions) => positions,
Err(e) => {
tracing::error!("Failed to get positions. Error: {e:?}");
return;
}
};

let mut margin_long = 0;
let mut margin_short = 0;
let mut quantity_long = 0.0;
let mut quantity_short = 0.0;

// Note: we should filter positions here by BTCUSD once we have multiple contract symbols

for position in positions {
debug_assert!(
position.contract_symbol == ContractSymbol::BtcUsd,
"We should filter positions here by BTCUSD once we have multiple contract symbols"
);
match position.trader_direction {
Direction::Long => {
// TODO: fix me: this was meant to be the traders margin
margin_long += position.coordinator_margin;
quantity_long += position.quantity;
}
Direction::Short => {
margin_short += position.coordinator_margin;
quantity_short += position.quantity;
}
}
}
POSITION_QUANTITY.observe(
cx,
quantity_long as f64,
&[
KeyValue::new("symbol", "BTCUSD"),
KeyValue::new("status", "open"),
KeyValue::new("direction", "long"),
],
);
POSITION_QUANTITY.observe(
cx,
quantity_short as f64,
&[
KeyValue::new("symbol", "BTCUSD"),
KeyValue::new("status", "open"),
KeyValue::new("direction", "short"),
],
);
POSITION_MARGIN.observe(
cx,
margin_long,
&[
KeyValue::new("symbol", "BTCUSD"),
KeyValue::new("status", "open"),
KeyValue::new("direction", "long"),
],
);
POSITION_MARGIN.observe(
cx,
margin_short,
&[
KeyValue::new("symbol", "BTCUSD"),
KeyValue::new("status", "open"),
KeyValue::new("direction", "short"),
],
);
}

fn node_metrics(
cx: &Context,
inner_node: Arc<
xxi_node::node::Node<
bdk_file_store::Store<bdk::wallet::ChangeSet>,
CoordinatorTenTenOneStorage,
NodeStorage,
>,
>,
) {
let connected_peers = inner_node.list_peers().len();
CONNECTED_PEERS.observe(cx, connected_peers as u64, &[]);

let balance = inner_node.get_on_chain_balance();

NODE_BALANCE_SATOSHI.observe(
cx,
balance.confirmed,
&[
KeyValue::new("type", "on-chain"),
KeyValue::new("status", "confirmed"),
],
);
NODE_BALANCE_SATOSHI.observe(
cx,
balance.immature,
&[
KeyValue::new("type", "on-chain"),
KeyValue::new("status", "immature"),
],
);
NODE_BALANCE_SATOSHI.observe(
cx,
balance.trusted_pending,
&[
KeyValue::new("type", "on-chain"),
KeyValue::new("status", "trusted_pending"),
],
);
NODE_BALANCE_SATOSHI.observe(
cx,
balance.untrusted_pending,
&[
KeyValue::new("type", "on-chain"),
KeyValue::new("status", "untrusted_pending"),
],
);
use anyhow::Result;
use diesel::r2d2::ConnectionManager;
use diesel::r2d2::PooledConnection;
use diesel::PgConnection;

pub fn collect_metrics(
mut conn: PooledConnection<ConnectionManager<PgConnection>>,
node: Node,
) -> Result<()> {
let balance = node.inner.wallet().get_balance();
db::metrics::create_metrics_entry(
&mut conn,
balance.confirmed + balance.untrusted_pending + balance.trusted_pending + balance.immature,
)?;
// TODO: also collect LN balance

Ok(())
}
Loading

0 comments on commit a544aa7

Please sign in to comment.