Skip to content

Commit

Permalink
Add query specific aggregated stats
Browse files Browse the repository at this point in the history
  • Loading branch information
athoscouto committed Feb 26, 2024
1 parent 5b28653 commit 9e934b1
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 8 deletions.
4 changes: 4 additions & 0 deletions libsql-server/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,10 @@ impl<W: Wal> Connection<W> {
self.stats.inc_rows_written(rows_written);
self.stats.inc_query(elapsed);
let weight = rows_read + rows_written;
self.stats.register_query(
&sql,
crate::stats::QueryStats::new(elapsed, rows_read, rows_written),
);
if self.stats.qualifies_as_top_query(weight) {
self.stats.add_top_query(crate::stats::TopQuery::new(
sql.clone(),
Expand Down
45 changes: 40 additions & 5 deletions libsql-server/src/http/admin/stats.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use itertools::Itertools;
use serde::Serialize;

use axum::extract::{Path, State};
Expand All @@ -8,7 +9,7 @@ use uuid::Uuid;

use crate::namespace::{MakeNamespace, NamespaceName};
use crate::replication::FrameNo;
use crate::stats::{SlowestQuery, Stats, TopQuery};
use crate::stats::{QueryStats, SlowestQuery, Stats, TopQuery};

use super::AppState;

Expand All @@ -18,13 +19,12 @@ pub struct StatsResponse {
pub rows_read_count: u64,
pub rows_written_count: u64,
pub storage_bytes_used: u64,
pub query_count: u64,
pub query_latency: u64,
pub write_requests_delegated: u64,
pub replication_index: FrameNo,
pub top_queries: Vec<TopQuery>,
pub slowest_queries: Vec<SlowestQuery>,
pub embedded_replica_frames_replicated: u64,
pub queries: QueriesStatsResponse,
}

impl From<&Stats> for StatsResponse {
Expand All @@ -37,8 +37,6 @@ impl From<&Stats> for StatsResponse {
write_requests_delegated: stats.write_requests_delegated(),
replication_index: stats.get_current_frame_no(),
embedded_replica_frames_replicated: stats.get_embedded_replica_frames_replicated(),
query_count: stats.get_query_count(),
query_latency: stats.get_query_latency(),
top_queries: stats
.top_queries()
.read()
Expand All @@ -53,6 +51,7 @@ impl From<&Stats> for StatsResponse {
.iter()
.cloned()
.collect(),
queries: QueriesStatsResponse::from(stats),
}
}
}
Expand All @@ -63,6 +62,42 @@ impl From<Stats> for StatsResponse {
}
}

#[derive(Serialize)]
pub struct QueriesStatsResponse {
pub id: Option<Uuid>,
pub count: u64,
pub elapsed_ms: u64,
pub stats: Vec<QueryAndStats>,
}

impl From<&Stats> for QueriesStatsResponse {
fn from(stats: &Stats) -> Self {
let count = stats.get_query_count();
let elapsed_ms = stats.get_query_latency();
let queries = stats.get_queries().read().unwrap();
Self {
count,
elapsed_ms,
id: queries.id(),
stats: queries
.stats()
.iter()
.map(|(k, v)| QueryAndStats {
query: k.clone(),
stat: v.clone(),
})
.collect_vec(),
}
}
}

#[derive(Serialize)]
pub struct QueryAndStats {
pub query: String,
#[serde(flatten)]
pub stat: QueryStats,
}

pub(super) async fn handle_stats<M: MakeNamespace, C>(
State(app_state): State<Arc<AppState<M, C>>>,
Path(namespace): Path<String>,
Expand Down
113 changes: 110 additions & 3 deletions libsql-server/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock, Weak};

use itertools::Itertools;
use metrics::{counter, gauge, increment_counter};
use serde::{Deserialize, Serialize};
use std::collections::BTreeSet;
use std::collections::{BTreeSet, HashMap};
use tokio::io::AsyncWriteExt;
use tokio::task::JoinSet;
use tokio::time::Duration;
Expand Down Expand Up @@ -52,6 +53,100 @@ impl SlowestQuery {
}
}

#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct QueryStats {
pub elapsed_ms: u64,
pub count: u64,
pub rows_written: u64,
pub rows_read: u64,
}

impl QueryStats {
pub fn new(elapsed_ms: u64, rows_read: u64, rows_written: u64) -> Self {
Self {
elapsed_ms,
count: 1,
rows_read,
rows_written,
}
}
pub fn empty() -> Self {
Self {
elapsed_ms: 0,
count: 0,
rows_read: 0,
rows_written: 0,
}
}
pub fn merge(&self, another: &QueryStats) -> Self {
Self {
elapsed_ms: self.elapsed_ms + another.elapsed_ms,
count: self.count + another.count,
rows_read: self.rows_read + another.rows_read,
rows_written: self.rows_written + another.rows_written,
}
}
}

#[derive(Debug, Default, Serialize, Deserialize)]
pub struct QueriesStats {
#[serde(default)]
id: Option<Uuid>,

#[serde(default)]
stats_threshold: AtomicU64,
#[serde(default)]
stats: HashMap<String, QueryStats>,
}

impl QueriesStats {
pub fn new() -> Arc<RwLock<Self>> {
let mut this = QueriesStats::default();
this.id = Some(Uuid::new_v4());
Arc::new(RwLock::new(this))
}

pub fn register_query(&mut self, sql: &String, stat: QueryStats) {
let (aggregated, new) = match self.stats.get(sql) {
Some(stat) => (stat.clone(), false),
None => (QueryStats::empty(), true),
};

let aggregated = aggregated.merge(&stat);
if aggregated.elapsed_ms <= self.stats_threshold.load(Ordering::Relaxed) {
return;
}

self.stats.insert(sql.clone(), aggregated);

if !new || self.stats.len() <= 30 {
return;
}

let mut vec = self.stats.clone().into_iter().collect_vec();
vec.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap());
let len = vec.len();
if len <= 30 {
return;
}

for i in 0..len - 30 {
self.stats.remove(&vec[i].0);
}

self.stats_threshold
.store(vec[len - 30].1.elapsed_ms, Ordering::Relaxed);
}

pub fn id(&self) -> Option<Uuid> {
self.id
}

pub fn stats(&self) -> &HashMap<String, QueryStats> {
&self.stats
}
}

#[derive(Debug, Default, Serialize, Deserialize)]
pub struct Stats {
#[serde(skip)]
Expand Down Expand Up @@ -86,6 +181,8 @@ pub struct Stats {
query_count: AtomicU64,
#[serde(default)]
query_latency: AtomicU64,
#[serde(skip)]
queries: Arc<RwLock<QueriesStats>>,
}

impl Stats {
Expand All @@ -106,6 +203,8 @@ impl Stats {
this.id = Some(Uuid::new_v4());
}

this.queries = QueriesStats::new();

this.namespace = namespace;
let this = Arc::new(this);

Expand Down Expand Up @@ -188,14 +287,22 @@ impl Stats {
self.current_frame_no.load(Ordering::Relaxed)
}

pub(crate) fn get_query_count(&self) -> FrameNo {
pub(crate) fn get_query_count(&self) -> u64 {
self.query_count.load(Ordering::Relaxed)
}

pub(crate) fn get_query_latency(&self) -> FrameNo {
pub(crate) fn get_query_latency(&self) -> u64 {
self.query_latency.load(Ordering::Relaxed)
}

pub(crate) fn get_queries(&self) -> &Arc<RwLock<QueriesStats>> {
&self.queries
}

pub(crate) fn register_query(&self, sql: &String, stat: QueryStats) {
self.queries.write().unwrap().register_query(sql, stat)
}

pub(crate) fn add_top_query(&self, query: TopQuery) {
let mut top_queries = self.top_queries.write().unwrap();
tracing::debug!(
Expand Down

0 comments on commit 9e934b1

Please sign in to comment.