Skip to content

Commit

Permalink
Add query percentile support to stats (#1112)
Browse files Browse the repository at this point in the history
* Record query latency percentiles with hdrhistogram

* Expose queries percentiles through stats response

* Simplify QueriesStats transformation to QueriesStatsResponse

* Reset query stats at the beginning of every hour (#1118)

* Add expires_at to control when QueriesStats should be reset

* Add query count and elapsed sum to query stats response

Also group all latency aggregations together under the elapsed key

* Remove option sprawl on QueriesStats fields

By making the whole struct optional where it is used

* Add created_at to stats queries responde object

* Set queries stats to none when stats is created

This will make the API response the same when no queries
have been recorded on the queries stats struct. This can
happen right after:
- the stats struct initialization
- the stats queries object expiration

Example of stats API response when no queries have been
recorded:

{
  ...
  "queries": null
}

* Use if else instead of early return from stats to response
  • Loading branch information
athoscouto committed Mar 1, 2024
1 parent 06369fc commit d5f7147
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 37 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions libsql-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ aes = { version = "0.8.3", optional = true }
cbc = { version = "0.1.2", optional = true }
zerocopy = { version = "0.7.28", features = ["derive", "alloc"] }
hashbrown = { version = "0.14.3", features = ["serde"] }
hdrhistogram = "7.5.4"

[dev-dependencies]
arbitrary = { version = "1.3.0", features = ["derive_arbitrary"] }
Expand Down
70 changes: 54 additions & 16 deletions libsql-server/src/http/admin/stats.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::sync::Arc;
use std::time::Duration;

use hdrhistogram::Histogram;
use itertools::Itertools;
use serde::Serialize;

Expand All @@ -26,7 +28,7 @@ pub struct StatsResponse {
pub embedded_replica_frames_replicated: u64,
pub query_count: u64,
pub elapsed_ms: u64,
pub queries: QueriesStatsResponse,
pub queries: Option<QueriesStatsResponse>,
}

impl From<&Stats> for StatsResponse {
Expand Down Expand Up @@ -55,7 +57,7 @@ impl From<&Stats> for StatsResponse {
.iter()
.cloned()
.collect(),
queries: QueriesStatsResponse::from(stats),
queries: stats.into(),
}
}
}
Expand All @@ -66,26 +68,62 @@ impl From<Stats> for StatsResponse {
}
}

#[derive(Serialize)]
#[derive(Serialize, Default)]
pub struct QueriesLatencyStats {
pub sum: u64,
pub p50: u64,
pub p75: u64,
pub p90: u64,
pub p95: u64,
pub p99: u64,
pub p999: u64,
}

impl QueriesLatencyStats {
fn from(hist: &Histogram<u32>, sum: &Duration) -> Self {
QueriesLatencyStats {
sum: sum.as_millis() as u64,
p50: hist.value_at_percentile(50.0),
p75: hist.value_at_percentile(75.0),
p90: hist.value_at_percentile(90.0),
p95: hist.value_at_percentile(95.0),
p99: hist.value_at_percentile(99.0),
p999: hist.value_at_percentile(99.9),
}
}
}

#[derive(Serialize, Default)]
pub struct QueriesStatsResponse {
pub id: Option<Uuid>,
pub id: Uuid,
pub created_at: u64,
pub count: u64,
pub stats: Vec<QueryAndStats>,
pub elapsed: QueriesLatencyStats,
}

impl From<&Stats> for QueriesStatsResponse {
impl From<&Stats> for Option<QueriesStatsResponse> {
fn from(stats: &Stats) -> Self {
let queries = stats.get_queries().read().unwrap();
Self {
id: queries.id(),
stats: queries
.stats()
.iter()
.map(|(k, v)| QueryAndStats {
query: k.clone(),
elapsed_ms: v.elapsed.as_millis() as u64,
stat: v.clone(),
})
.collect_vec(),
if queries.as_ref().map_or(true, |q| q.expired()) {
Self::default()
} else {
let queries = queries.as_ref().unwrap();
Some(QueriesStatsResponse {
id: queries.id(),
created_at: queries.created_at().timestamp() as u64,
count: queries.count(),
elapsed: QueriesLatencyStats::from(queries.hist(), &queries.elapsed()),
stats: queries
.stats()
.iter()
.map(|(k, v)| QueryAndStats {
query: k.clone(),
elapsed_ms: v.elapsed.as_millis() as u64,
stat: v.clone(),
})
.collect_vec(),
})
}
}
}
Expand Down
88 changes: 67 additions & 21 deletions libsql-server/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock, Weak};

use chrono::{DateTime, DurationRound, Utc};
use hdrhistogram::Histogram;
use metrics::{counter, gauge, histogram, increment_counter};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeSet, HashMap};
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc;
use tokio::task::JoinSet;
use tokio::time::Duration;
use tracing::debug;
use tokio::time::{Duration, Instant};
use uuid::Uuid;

use crate::namespace::NamespaceName;
Expand Down Expand Up @@ -83,31 +84,46 @@ impl QueryStats {
}
}

#[derive(Debug, Default, Serialize, Deserialize)]
#[derive(Debug)]
pub struct QueriesStats {
#[serde(default)]
id: Option<Uuid>,
#[serde(default)]
stats_threshold: AtomicU64,
#[serde(default)]
id: Uuid,
stats: HashMap<String, QueryStats>,
elapsed: Duration,
stats_threshold: u64,
hist: Histogram<u32>,
expires_at: Option<Instant>,
created_at: DateTime<Utc>,
}

impl QueriesStats {
fn new() -> Arc<RwLock<Self>> {
let mut this = QueriesStats::default();
this.id = Some(Uuid::new_v4());
Arc::new(RwLock::new(this))
fn new() -> Self {
Self {
id: Uuid::new_v4(),
stats: HashMap::new(),
elapsed: Duration::default(),
stats_threshold: 0,
hist: Histogram::<u32>::new(3).unwrap(),
expires_at: None,
created_at: Utc::now(),
}
}

fn with_expiration(expires_at: Instant) -> Self {
let mut this = Self::new();
this.expires_at = Some(expires_at);
this
}

fn register_query(&mut self, sql: &String, stat: QueryStats) {
self.elapsed += stat.elapsed;
let _ = self.hist.record(stat.elapsed.as_millis() as u64);

let (aggregated, new) = match self.stats.get(sql) {
Some(aggregated) => (aggregated.merge(&stat), false),
None => (stat, true),
};

debug!("query: {}, elapsed: {:?}", sql, aggregated.elapsed);
if (aggregated.elapsed.as_micros() as u64) < self.stats_threshold.load(Ordering::Relaxed) {
if (aggregated.elapsed.as_micros() as u64) < self.stats_threshold {
return;
}

Expand All @@ -132,8 +148,7 @@ impl QueriesStats {

fn update_threshold(&mut self) {
if let Some((_, v)) = self.min() {
self.stats_threshold
.store(v.elapsed.as_micros() as u64, Ordering::Relaxed);
self.stats_threshold = v.elapsed.as_micros() as u64;
}
}

Expand All @@ -144,13 +159,33 @@ impl QueriesStats {
}
}

pub(crate) fn id(&self) -> Option<Uuid> {
pub(crate) fn id(&self) -> Uuid {
self.id
}

pub(crate) fn stats(&self) -> &HashMap<String, QueryStats> {
&self.stats
}

pub(crate) fn hist(&self) -> &Histogram<u32> {
&self.hist
}

pub(crate) fn expired(&self) -> bool {
self.expires_at.map_or(false, |exp| exp < Instant::now())
}

pub(crate) fn elapsed(&self) -> &Duration {
&self.elapsed
}

pub(crate) fn count(&self) -> u64 {
self.hist.len() as u64
}

pub(crate) fn created_at(&self) -> &DateTime<Utc> {
&self.created_at
}
}

#[derive(Debug, Default, Clone)]
Expand Down Expand Up @@ -199,7 +234,7 @@ pub struct Stats {
#[serde(default)]
query_latency: AtomicU64,
#[serde(skip)]
queries: Arc<RwLock<QueriesStats>>,
queries: Arc<RwLock<Option<QueriesStats>>>,
}

impl Stats {
Expand All @@ -222,7 +257,6 @@ impl Stats {

let (update_sender, update_receiver) = mpsc::channel(256);

this.queries = QueriesStats::new();
this.namespace = namespace;
this.sender = Some(update_sender);

Expand Down Expand Up @@ -381,12 +415,16 @@ impl Stats {
self.query_latency.load(Ordering::Relaxed)
}

pub(crate) fn get_queries(&self) -> &Arc<RwLock<QueriesStats>> {
pub(crate) fn get_queries(&self) -> &Arc<RwLock<Option<QueriesStats>>> {
&self.queries
}

fn register_query(&self, sql: &String, stat: QueryStats) {
self.queries.write().unwrap().register_query(sql, stat)
let mut queries = self.queries.write().unwrap();
if queries.as_ref().map_or(true, |q| q.expired()) {
*queries = Some(QueriesStats::with_expiration(next_hour()))
}
queries.as_mut().unwrap().register_query(sql, stat)
}

fn add_top_query(&self, query: TopQuery) {
Expand Down Expand Up @@ -493,3 +531,11 @@ async fn try_persist_stats(stats: Weak<Stats>, path: &Path) -> anyhow::Result<()
tokio::fs::rename(temp_path, path).await?;
Ok(())
}

fn next_hour() -> Instant {
let utc_now = Utc::now();
let next_hour = (utc_now + chrono::Duration::hours(1))
.duration_trunc(chrono::Duration::hours(1))
.unwrap();
Instant::now() + (next_hour - utc_now).to_std().unwrap()
}

0 comments on commit d5f7147

Please sign in to comment.