Skip to content

Commit

Permalink
refactor: use metrics instead of pomfrit
Browse files Browse the repository at this point in the history
  • Loading branch information
0xdeafbeef committed Feb 5, 2024
1 parent 3f2321c commit 06eb6cf
Show file tree
Hide file tree
Showing 9 changed files with 686 additions and 846 deletions.
1,276 changes: 621 additions & 655 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ version = "0.6.0"
[dependencies]
anyhow = "1"
async-trait = "0.1"
axum-core = "0.3.1"
base64 = "0.13.1"
futures = "0.3.23"
hex = "0.4.3"
Expand Down
12 changes: 7 additions & 5 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
[package]
name = "everscale-rpc-node"
version = "0.1.2"
version = "0.2.0"
edition = "2021"

[dependencies]
anyhow = "1.0"
argh = "0.1"
async-trait = "0.1"
config = { version = "0.13", default-features = false, features = ["yaml"] }
config = { version = "0.14.0", default-features = false, features = ["yaml"] }
everscale-network = "0.5.3"
futures-util = "0.3"
is-terminal = "0.4"
pomfrit = "0.1"
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.9"
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
Expand All @@ -35,6 +32,11 @@ ton_types = { git = "https://github.com/broxus/ton-labs-types.git" }

everscale-rpc-server = { path = "../server" }

metrics-exporter-prometheus = "0.13.0"

[features]
default = []
venom = ["ton-indexer/venom", "ton_block/venom"]

[package.metadata.cargo-machete]
ignored = ["ton-block", "ton-types"]
20 changes: 18 additions & 2 deletions node/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::net::{Ipv4Addr, SocketAddrV4};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
use std::path::{Path, PathBuf};

use anyhow::{Context, Result};
Expand All @@ -17,7 +17,7 @@ pub struct AppConfig {
/// Prometheus metrics exporter settings.
/// Completely disable when not specified
#[serde(default)]
pub metrics_settings: Option<pomfrit::Config>,
pub metrics_settings: Option<MetricsConfig>,

/// Light node settings
#[serde(default)]
Expand Down Expand Up @@ -117,6 +117,22 @@ impl NodeConfig {
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(default)]
pub struct MetricsConfig {
/// Listen address of metrics. Used by the client to gather prometheus metrics.
/// Default: `127.0.0.1:10000`
pub listen_address: SocketAddr,
}

impl Default for MetricsConfig {
fn default() -> Self {
Self {
listen_address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 10000),
}
}
}

impl Default for NodeConfig {
fn default() -> Self {
Self {
Expand Down
162 changes: 17 additions & 145 deletions node/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::io::IsTerminal;
use std::net::SocketAddr;
use std::panic;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use anyhow::{Context, Result};
use argh::FromArgs;
use everscale_rpc_server::RpcState;
use is_terminal::IsTerminal;
use pomfrit::formatter::*;
use std::sync::Arc;
use ton_indexer::Engine;
use tracing_subscriber::EnvFilter;

Expand Down Expand Up @@ -111,18 +111,9 @@ async fn run(app: App) -> Result<()> {
}

// Create metrics exporter
let (_exporter, metrics_writer) = pomfrit::create_exporter(config.metrics_settings).await?;
metrics_writer.spawn({
let rpc_state = rpc_state.clone();
let engine = engine.clone();
move |buf| {
buf.write(ExplorerMetrics {
engine: &engine,
panicked: &panicked,
rpc_state: &rpc_state,
});
}
});
if let Some(c) = config.metrics_settings {
install_monitoring(c.listen_address)?;
}
tracing::info!("initialized exporter");

// Start the engine
Expand All @@ -139,134 +130,15 @@ async fn run(app: App) -> Result<()> {
futures_util::future::pending().await
}

struct ExplorerMetrics<'a> {
engine: &'a Engine,
panicked: &'a AtomicBool,
rpc_state: &'a RpcState,
}

impl std::fmt::Display for ExplorerMetrics<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let panicked = self.panicked.load(Ordering::Acquire) as u8;
f.begin_metric("panicked").value(panicked)?;

// TON indexer
let indexer_metrics = self.engine.metrics();

let last_mc_utime = indexer_metrics.last_mc_utime.load(Ordering::Acquire);
if last_mc_utime > 0 {
f.begin_metric("ton_indexer_mc_time_diff")
.value(indexer_metrics.mc_time_diff.load(Ordering::Acquire))?;
f.begin_metric("ton_indexer_sc_time_diff").value(
indexer_metrics
.shard_client_time_diff
.load(Ordering::Acquire),
)?;

f.begin_metric("ton_indexer_last_mc_utime")
.value(last_mc_utime)?;
}

let last_mc_block_seqno = indexer_metrics.last_mc_block_seqno.load(Ordering::Acquire);
if last_mc_block_seqno > 0 {
f.begin_metric("ton_indexer_last_mc_block_seqno")
.value(last_mc_block_seqno)?;
}

let last_shard_client_mc_block_seqno = indexer_metrics
.last_shard_client_mc_block_seqno
.load(Ordering::Acquire);
if last_shard_client_mc_block_seqno > 0 {
f.begin_metric("ton_indexer_last_sc_block_seqno")
.value(last_shard_client_mc_block_seqno)?;
}

// jemalloc

let broxus_util::alloc::profiling::JemallocStats {
allocated,
active,
metadata,
resident,
mapped,
retained,
dirty,
fragmentation,
} = broxus_util::alloc::profiling::fetch_stats().map_err(|e| {
tracing::error!("failed to fetch allocator stats: {e:?}");
std::fmt::Error
})?;

f.begin_metric("jemalloc_allocated_bytes")
.value(allocated)?;
f.begin_metric("jemalloc_active_bytes").value(active)?;
f.begin_metric("jemalloc_metadata_bytes").value(metadata)?;
f.begin_metric("jemalloc_resident_bytes").value(resident)?;
f.begin_metric("jemalloc_mapped_bytes").value(mapped)?;
f.begin_metric("jemalloc_retained_bytes").value(retained)?;
f.begin_metric("jemalloc_dirty_bytes").value(dirty)?;
f.begin_metric("jemalloc_fragmentation_bytes")
.value(fragmentation)?;

// RocksDB

let ton_indexer::RocksdbStats {
whole_db_stats,
block_cache_usage,
block_cache_pined_usage,
} = self.engine.get_memory_usage_stats().map_err(|e| {
tracing::error!("failed to fetch rocksdb stats: {e:?}");
std::fmt::Error
})?;

f.begin_metric("rocksdb_block_cache_usage_bytes")
.value(block_cache_usage)?;
f.begin_metric("rocksdb_block_cache_pined_usage_bytes")
.value(block_cache_pined_usage)?;
f.begin_metric("rocksdb_memtable_total_size_bytes")
.value(whole_db_stats.mem_table_total)?;
f.begin_metric("rocksdb_memtable_unflushed_size_bytes")
.value(whole_db_stats.mem_table_unflushed)?;
f.begin_metric("rocksdb_memtable_cache_bytes")
.value(whole_db_stats.cache_total)?;

let internal_metrics = self.engine.internal_metrics();

f.begin_metric("shard_states_cache_len")
.value(internal_metrics.shard_states_cache_len)?;
f.begin_metric("shard_states_operations_len")
.value(internal_metrics.shard_states_operations_len)?;
f.begin_metric("block_applying_operations_len")
.value(internal_metrics.block_applying_operations_len)?;
f.begin_metric("next_block_applying_operations_len")
.value(internal_metrics.next_block_applying_operations_len)?;

let cells_cache_stats = internal_metrics.cells_cache_stats;
f.begin_metric("cells_cache_hits")
.value(cells_cache_stats.hits)?;
f.begin_metric("cells_cache_requests")
.value(cells_cache_stats.requests)?;
f.begin_metric("cells_cache_occupied")
.value(cells_cache_stats.occupied)?;
f.begin_metric("cells_cache_hits_ratio")
.value(cells_cache_stats.hits_ratio)?;
f.begin_metric("cells_cache_size_bytes")
.value(cells_cache_stats.size_bytes)?;

// RPC

f.begin_metric("jrpc_enabled").value(1)?;

let jrpc = self.rpc_state.jrpc_metrics();
f.begin_metric("jrpc_total").value(jrpc.total)?;
f.begin_metric("jrpc_errors").value(jrpc.errors)?;
f.begin_metric("jrpc_not_found").value(jrpc.not_found)?;

let proto = self.rpc_state.proto_metrics();
f.begin_metric("proto_total").value(proto.total)?;
f.begin_metric("proto_errors").value(proto.errors)?;
f.begin_metric("proto_not_found").value(proto.not_found)?;

Ok(())
}
fn install_monitoring(metrics_addr: SocketAddr) -> Result<()> {
use metrics_exporter_prometheus::Matcher;
const EXPONENTIAL_SECONDS: &[f64] = &[
0.000001, 0.0001, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
60.0, 120.0, 300.0, 600.0, 3600.0,
];
metrics_exporter_prometheus::PrometheusBuilder::new()
.set_buckets_for_metric(Matcher::Prefix("time".to_string()), EXPONENTIAL_SECONDS)?
.with_http_listener(metrics_addr)
.install()
.context("Failed installing metrics exporter")
}
4 changes: 2 additions & 2 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "everscale-rpc-server"
version = "0.2.3"
version = "0.3.0"
edition = "2021"

[dependencies]
Expand All @@ -20,11 +20,11 @@ rlimit = "0.9.1"
rustc-hash = "1.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
tokio = { version = "1", features = ["sync"] }
tower = "0.4.12"
tower-http = { version = "0.4.0", features = ["cors", "timeout"] }
tracing = "0.1.37"
metrics = "0.22.0"
weedb = { version = "0.1", features = ["zstd", "lz4", "jemalloc"] }

nekoton-abi = { git = "https://github.com/broxus/nekoton.git", default-features = false }
Expand Down
10 changes: 5 additions & 5 deletions server/src/jrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,11 @@ impl JrpcServer {
last_mc_utime: metrics.last_mc_utime.load(Ordering::Acquire),
mc_time_diff: metrics.mc_time_diff.load(Ordering::Acquire),
shard_client_time_diff: metrics.shard_client_time_diff.load(Ordering::Acquire),
smallest_known_lt: self.state.persistent_storage.as_ref().map(|storage| {
storage
.min_tx_lt
.load(Ordering::Acquire)
}),
smallest_known_lt: self
.state
.persistent_storage
.as_ref()
.map(|storage| storage.min_tx_lt.load(Ordering::Acquire)),
})
}

Expand Down
41 changes: 15 additions & 26 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::future::Future;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -109,8 +108,8 @@ impl RpcState {
engine: Default::default(),
runtime_storage: Default::default(),
persistent_storage,
jrpc_counters: Default::default(),
proto_counters: Default::default(),
jrpc_counters: Counters::named("jrpc"),
proto_counters: Counters::named("proto"),
})
}

Expand Down Expand Up @@ -193,14 +192,6 @@ impl RpcState {
Server::new(self)?.serve()
}

pub fn jrpc_metrics(&self) -> Metrics {
self.jrpc_counters.metrics()
}

pub fn proto_metrics(&self) -> Metrics {
self.proto_counters.metrics()
}

pub fn process_blocks_edge(&self) {
if let Some(storage) = &self.persistent_storage {
storage.update_snapshot();
Expand Down Expand Up @@ -269,32 +260,30 @@ pub struct InitialState {
pub smallest_known_lt: u64,
}

#[derive(Default)]
struct Counters {
total: AtomicU64,
not_found: AtomicU64,
errors: AtomicU64,
total: metrics::Counter,
not_found: metrics::Counter,
errors: metrics::Counter,
}

impl Counters {
fn named(name: &'static str) -> Self {
Self {
total: metrics::counter!("jrpc_total", "kind" => name),
not_found: metrics::counter!("jrpc_not_found", "kind" => name),
errors: metrics::counter!("jrpc_errors", "kind" => name),
}
}
fn increase_total(&self) {
self.total.fetch_add(1, Ordering::Relaxed);
self.total.increment(1)
}

fn increase_not_found(&self) {
self.not_found.fetch_add(1, Ordering::Relaxed);
self.not_found.increment(1)
}

fn increase_errors(&self) {
self.errors.fetch_add(1, Ordering::Relaxed);
}

fn metrics(&self) -> Metrics {
Metrics {
total: self.total.load(Ordering::Relaxed),
not_found: self.not_found.load(Ordering::Relaxed),
errors: self.errors.load(Ordering::Relaxed),
}
self.errors.increment(1)
}
}

Expand Down
6 changes: 1 addition & 5 deletions server/src/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,7 @@ impl ProtoServer {
.state
.persistent_storage
.as_ref()
.map(|storage| {
storage
.min_tx_lt
.load(Ordering::Acquire)
})
.map(|storage| storage.min_tx_lt.load(Ordering::Acquire))
.unwrap_or(u64::MAX),
},
))
Expand Down

0 comments on commit 06eb6cf

Please sign in to comment.