From 18e605f8b3cacc4553c5127990a87d9fbcd0dbf4 Mon Sep 17 00:00:00 2001 From: Clemens Winter Date: Sat, 24 Feb 2024 09:59:26 -0800 Subject: [PATCH] add queries to db_bench, measure query disk read --- src/bin/db_bench.rs | 114 +++++++++++++++++++++++++-- src/disk_store/interface.rs | 3 +- src/disk_store/noop_storage.rs | 3 +- src/disk_store/v2.rs | 8 +- src/engine/data_types/data.rs | 6 +- src/engine/execution/query_task.rs | 36 +++++---- src/locustdb.rs | 4 + src/mem_store/partition.rs | 4 +- src/perf_counter.rs | 42 ++++++++++ src/scheduler/disk_read_scheduler.rs | 5 +- src/scheduler/inner_locustdb.rs | 13 ++- src/syntax/parser.rs | 2 +- 12 files changed, 206 insertions(+), 34 deletions(-) diff --git a/src/bin/db_bench.rs b/src/bin/db_bench.rs index 2dae648f..b827208a 100644 --- a/src/bin/db_bench.rs +++ b/src/bin/db_bench.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use std::thread; use std::time::Duration; +use locustdb::LocustDB; use rand::{FromEntropy, Rng}; use structopt::StructOpt; use tempfile::tempdir; @@ -71,7 +72,7 @@ async fn main() { let perf_counter = db.perf_counter(); - log::info!("Done"); + log::info!("Ingestion done"); // count number of files in db_path and all subdirectories let file_count = walkdir::WalkDir::new(db_path.path()) @@ -87,6 +88,54 @@ async fn main() { .map(|e| e.metadata().unwrap().len()) .sum::(); + println!(); + query( + &db, + "Querying 100 related columns in small table", + &format!( + "SELECT {} FROM {}", + (0..100) + .map(|c| format!("col_{c}")) + .collect::>() + .join(", "), + small_tables[0] + ), + ) + .await; + query( + &db, + "Querying full small table", + &format!("SELECT * FROM {}", small_tables[1]), + ) + .await; + query( + &db, + "Querying 100 random columns in small table", + &format!( + "SELECT {} FROM {}", + (0..100) + .map(|_| format!("col_{}", rng.gen_range(0u64, 1 << load_factor))) + .collect::>() + .join(", "), + small_tables[2] + ), + ) + .await; + query(&db, "Querying 10 related columns in large table", "SELECT col_0, col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8, col_9 FROM event_log") + .await; + query( + &db, + "Querying 10 random columns in large table", + &format!( + "SELECT {} FROM event_log", + (0..10) + .map(|_| format!("col_{}", rng.gen_range(0u64, 1 << load_factor))) + .collect::>() + .join(", ") + ), + ).await; + + println!(); println!("elapsed: {:?}", start_time.elapsed()); println!( "total uncompressed data: {}", @@ -136,12 +185,30 @@ async fn main() { " ingestion bytes: {}", locustdb::unit_fmt::bite(perf_counter.network_read_ingestion_bytes() as usize) ); + println!("query"); + println!( + " files opened: {}", + perf_counter.files_opened_partition(), + ); + println!( + " disk read: {}", + locustdb::unit_fmt::bite(perf_counter.disk_read_partition_bytes() as usize) + ); +} - // 1 large tables with 2^(2N) rows and 2^(2N) columns each - // let large_table = format!( - // "{}_{i}", - // random_word::gen(random_word::Lang::En).to_string(); - // ); +async fn query(db: &LocustDB, description: &str, query: &str) { + let evicted_bytes = db.evict_cache(); + log::info!("Evicted {}", locustdb::unit_fmt::bite(evicted_bytes)); + println!("{}", description); + let response = db.run_query(query, false, vec![]).await.unwrap().unwrap(); + println!( + "Returned {} columns with {} rows in {:?} ({} files opened, {})", + response.rows.first().map(|r| r.len()).unwrap_or(0), + response.rows.len(), + Duration::from_nanos(response.stats.runtime_ns), + response.stats.files_opened, + locustdb::unit_fmt::bite(response.stats.disk_read_bytes as usize), + ); } fn create_locustdb(db_path: PathBuf) -> Arc { @@ -253,3 +320,38 @@ fn small_table_names(load_factor: u64) -> Vec { // network // ingestion requests: 14 // ingestion bytes: 280MiB + +// $ RUST_BACKTRACE=1 cargo run --bin db_bench -- --load-factor=8 +// Querying 100 related columns in small table +// Returned 100 columns with 256 rows in 252.602ms (100 files opened, 24.0MiB) +// Querying full small table +// Returned 257 columns with 256 rows in 684.762302ms (257 files opened, 61.8MiB) +// Querying 100 random columns in small table +// Returned 100 columns with 256 rows in 195.1599ms (77 files opened, 18.5MiB) +// Querying 10 related columns in large table +// Returned 10 columns with 2048 rows in 610.756901ms (10 files opened, 62.1MiB) +// Querying 10 random columns in large table +// Returned 10 columns with 2048 rows in 605.867601ms (10 files opened, 68.4MiB) + +// elapsed: 27.505325227s +// total uncompressed data: 256MiB +// total size on disk: 278MiB (SmallRng output is compressible) +// total files: 795 +// total events: 73728 +// disk writes +// total: 566MiB +// wal: 280MiB +// partition: 275MiB +// compaction: 0.000B +// meta store: 11.9MiB +// files created +// total: 813 +// wal: 14 +// partition: 794 +// meta: 5 +// network +// ingestion requests: 14 +// ingestion bytes: 280MiB +// query +// files opened: 454 +// disk read: 235MiB \ No newline at end of file diff --git a/src/disk_store/interface.rs b/src/disk_store/interface.rs index a65bfc91..4d26f5a1 100644 --- a/src/disk_store/interface.rs +++ b/src/disk_store/interface.rs @@ -1,11 +1,12 @@ use serde::{Deserialize, Serialize}; use crate::mem_store::column::Column; +use crate::perf_counter::QueryPerfCounter; use crate::scheduler::inner_locustdb::InnerLocustDB; pub trait ColumnLoader: Sync + Send + 'static { - fn load_column(&self, table_name: &str, partition: PartitionID, column_name: &str) -> Vec; + fn load_column(&self, table_name: &str, partition: PartitionID, column_name: &str, perf_counter: &QueryPerfCounter) -> Vec; fn load_column_range(&self, start: PartitionID, end: PartitionID, column_name: &str, ldb: &InnerLocustDB); } diff --git a/src/disk_store/noop_storage.rs b/src/disk_store/noop_storage.rs index f807675b..4dac9fbc 100644 --- a/src/disk_store/noop_storage.rs +++ b/src/disk_store/noop_storage.rs @@ -1,11 +1,12 @@ use crate::mem_store::column::Column; use crate::disk_store::interface::*; +use crate::perf_counter::QueryPerfCounter; use crate::scheduler::inner_locustdb::InnerLocustDB; pub struct NoopStorage; impl ColumnLoader for NoopStorage { - fn load_column(&self, _: &str, _: PartitionID, _: &str) -> Vec { + fn load_column(&self, _: &str, _: PartitionID, _: &str, _: &QueryPerfCounter) -> Vec { panic!("Can't load column from NoopStorage!") } fn load_column_range(&self, _: PartitionID, _: PartitionID, _: &str, _: &InnerLocustDB) {} diff --git a/src/disk_store/v2.rs b/src/disk_store/v2.rs index c32ca4ce..b8d336ae 100644 --- a/src/disk_store/v2.rs +++ b/src/disk_store/v2.rs @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize}; use super::interface::{ColumnLoader, PartitionMetadata}; use crate::logging_client::EventBuffer; use crate::mem_store::Column; -use crate::perf_counter::PerfCounter; +use crate::perf_counter::{PerfCounter, QueryPerfCounter}; #[derive(Serialize, Deserialize)] pub struct WALSegment<'a> { @@ -39,6 +39,7 @@ pub trait Storage: Send + Sync + 'static { partition: PartitionID, table_name: &str, column_name: &str, + perf_counter: &QueryPerfCounter, ) -> Vec; } @@ -48,8 +49,9 @@ impl ColumnLoader for StorageV2 { table_name: &str, partition: super::interface::PartitionID, column_name: &str, + perf_counter: &QueryPerfCounter, ) -> Vec { - Storage::load_column(self, partition, table_name, column_name) + Storage::load_column(self, partition, table_name, column_name, perf_counter) } fn load_column_range( @@ -257,6 +259,7 @@ impl Storage for StorageV2 { partition: PartitionID, table_name: &str, column_name: &str, + perf_counter: &QueryPerfCounter, ) -> Vec { // TODO: efficient access let subpartition_key = self @@ -279,6 +282,7 @@ impl Storage for StorageV2 { .join(format!("{}_{}.part", partition, subpartition_key)); let data = self.writer.load(&path).unwrap(); self.perf_counter.disk_read_partition(data.len() as u64); + perf_counter.disk_read(data.len() as u64); bincode::deserialize(&data).unwrap() } } diff --git a/src/engine/data_types/data.rs b/src/engine/data_types/data.rs index d111b526..43fc7490 100644 --- a/src/engine/data_types/data.rs +++ b/src/engine/data_types/data.rs @@ -462,8 +462,10 @@ impl<'a, T: VecData + 'a> Data<'a> for &'a [T] { Box::new(&self[from..to]) } - fn append_all(&mut self, _other: &dyn Data<'a>, _count: usize) -> Option> { - panic!("append_all on borrow") + fn append_all(&mut self, other: &dyn Data<'a>, count: usize) -> Option> { + let mut owned = Vec::from(*self); + owned.append_all(other, count); + Some(Box::new(owned)) } fn type_error(&self, func_name: &str) -> String { diff --git a/src/engine/execution/query_task.rs b/src/engine/execution/query_task.rs index 7a2699e0..85fa38fb 100644 --- a/src/engine/execution/query_task.rs +++ b/src/engine/execution/query_task.rs @@ -6,14 +6,15 @@ use std::mem; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use std::sync::Mutex; +use std::time::Instant; use serde::{Deserialize, Serialize}; -use time::OffsetDateTime; use crate::engine::*; use crate::ingest::raw_val::RawVal; use crate::mem_store::column::DataSource; use crate::mem_store::partition::Partition; +use crate::perf_counter::QueryPerfCounter; use crate::scheduler::disk_read_scheduler::DiskReadScheduler; use crate::scheduler::*; use crate::syntax::expression::*; @@ -28,8 +29,9 @@ pub struct QueryTask { partitions: Vec>, referenced_cols: HashSet, output_colnames: Vec, - start_time_ns: i128, + start_time: Instant, db: Arc, + perf_counter: Arc, // Lifetime is not actually static, but tied to the lifetime of this struct. // There is currently no good way to express this constraint in Rust. @@ -45,7 +47,6 @@ pub struct QueryState<'a> { completed_batches: usize, partial_results: Vec>, explains: Vec, - rows_scanned: usize, rows_collected: usize, colstacks: Vec>>>, } @@ -61,7 +62,9 @@ pub struct QueryOutput { #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct QueryStats { pub runtime_ns: u64, - pub rows_scanned: usize, + pub rows_scanned: u64, + pub files_opened: u64, + pub disk_read_bytes: u64, } impl QueryTask { @@ -73,7 +76,7 @@ impl QueryTask { db: Arc, sender: SharedSender, ) -> Result { - let start_time_ns = OffsetDateTime::unix_epoch().unix_timestamp_nanos(); + let start_time = Instant::now(); if query.is_select_star() { query.select = find_all_cols(&source) .into_iter() @@ -100,14 +103,14 @@ impl QueryTask { partitions: source, referenced_cols, output_colnames, - start_time_ns, + start_time, db, + perf_counter: Arc::default(), unsafe_state: Mutex::new(QueryState { partial_results: Vec::new(), completed_batches: 0, explains: Vec::new(), - rows_scanned: 0, rows_collected: 0, colstacks: Vec::new(), }), @@ -123,8 +126,10 @@ impl QueryTask { rows: vec![], query_plans: Default::default(), stats: QueryStats { - runtime_ns: 0, + runtime_ns: start_time.elapsed().as_nanos() as u64, rows_scanned: 0, + files_opened: 0, + disk_read_bytes: 0, }, })); } @@ -140,7 +145,7 @@ impl QueryTask { let mut explains = Vec::new(); while let Some((partition, id)) = self.next_partition() { let show = self.show.iter().any(|&x| x == id); - let cols = partition.get_cols(&self.referenced_cols, &self.db); + let cols = partition.get_cols(&self.referenced_cols, &self.db, self.perf_counter.as_ref()); rows_scanned += cols.iter().next().map_or(0, |c| c.1.len()); let unsafe_cols = unsafe { mem::transmute::< @@ -229,7 +234,7 @@ impl QueryTask { } state.completed_batches += result.batch_count; state.explains.extend(explains); - state.rows_scanned += rows_scanned; + self.perf_counter.scanned(rows_scanned as u64); state.rows_collected += rows_collected; let result = unsafe { mem::transmute::<_, BatchResult<'static>>(result) }; @@ -266,9 +271,9 @@ impl QueryTask { ) .unwrap() .0; - self.convert_to_output_format(&full_result, state.rows_scanned, &state.explains) + self.convert_to_output_format(&full_result, &state.explains) } else { - self.convert_to_output_format(&full_result, state.rows_scanned, &state.explains) + self.convert_to_output_format(&full_result, &state.explains) }; self.sender.send(Ok(final_result)); self.completed.store(true, Ordering::SeqCst); @@ -309,7 +314,6 @@ impl QueryTask { fn convert_to_output_format( &self, full_result: &BatchResult, - rows_scanned: usize, explains: &[String], ) -> QueryOutput { let lo = self.final_pass.as_ref().map(|x| &x.limit).unwrap_or(&self.main_phase.limit); @@ -339,8 +343,10 @@ impl QueryTask { rows: result_rows, query_plans, stats: QueryStats { - runtime_ns: (OffsetDateTime::unix_epoch().unix_timestamp_nanos() - self.start_time_ns) as u64, - rows_scanned, + runtime_ns: self.start_time.elapsed().as_nanos() as u64, + rows_scanned: self.perf_counter.rows_scanned(), + files_opened: self.perf_counter.files_opened(), + disk_read_bytes: self.perf_counter.disk_read_bytes(), }, } } diff --git a/src/locustdb.rs b/src/locustdb.rs index bd3b3030..13e9a24f 100644 --- a/src/locustdb.rs +++ b/src/locustdb.rs @@ -179,6 +179,10 @@ impl LocustDB { pub fn force_flush(&self) { self.inner_locustdb.wal_flush(); } + + pub fn evict_cache(&self) -> usize { + self.inner_locustdb.evict_cache() + } } #[derive(Clone)] diff --git a/src/mem_store/partition.rs b/src/mem_store/partition.rs index f83c103a..de5542fd 100644 --- a/src/mem_store/partition.rs +++ b/src/mem_store/partition.rs @@ -6,6 +6,7 @@ use std::sync::{Arc, Mutex, MutexGuard}; use crate::disk_store::interface::*; use crate::ingest::buffer::Buffer; use crate::mem_store::*; +use crate::perf_counter::QueryPerfCounter; use crate::scheduler::disk_read_scheduler::DiskReadScheduler; // Table, Partition, Column @@ -99,11 +100,12 @@ impl Partition { &self, referenced_cols: &HashSet, drs: &DiskReadScheduler, + perf_counter: &QueryPerfCounter, ) -> HashMap> { let mut columns = HashMap::>::new(); for handle in &self.cols { if referenced_cols.contains(handle.name()) { - let column = drs.get_or_load(handle); + let column = drs.get_or_load(handle, perf_counter); columns.insert(handle.name().to_string(), Arc::new(column)); } } diff --git a/src/perf_counter.rs b/src/perf_counter.rs index ecc2f5bf..25d864c3 100644 --- a/src/perf_counter.rs +++ b/src/perf_counter.rs @@ -23,6 +23,13 @@ pub struct PerfCounter { network_read_ingestion_bytes: AtomicU64, } +#[derive(Debug, Default)] +pub struct QueryPerfCounter { + pub rows_scanned: AtomicU64, + pub files_opened: AtomicU64, + pub disk_read_bytes: AtomicU64, +} + impl PerfCounter { pub fn new() -> PerfCounter { PerfCounter::default() @@ -132,4 +139,39 @@ impl PerfCounter { pub fn ingestion_requests(&self) -> u64 { self.ingestion_requests.load(ORDERING) } + + pub fn disk_read_partition_bytes(&self) -> u64 { + self.disk_read_partition_bytes.load(ORDERING) + } + + pub fn files_opened_partition(&self) -> u64 { + self.file_accessed_partition.load(ORDERING) + } } + +impl QueryPerfCounter { + pub fn new() -> QueryPerfCounter { + QueryPerfCounter::default() + } + + pub fn rows_scanned(&self) -> u64 { + self.rows_scanned.load(ORDERING) + } + + pub fn files_opened(&self) -> u64 { + self.files_opened.load(ORDERING) + } + + pub fn disk_read_bytes(&self) -> u64 { + self.disk_read_bytes.load(ORDERING) + } + + pub fn scanned(&self, rows: u64) { + self.rows_scanned.fetch_add(rows, ORDERING); + } + + pub fn disk_read(&self, bytes: u64) { + self.files_opened.fetch_add(1, ORDERING); + self.disk_read_bytes.fetch_add(bytes, ORDERING); + } +} \ No newline at end of file diff --git a/src/scheduler/disk_read_scheduler.rs b/src/scheduler/disk_read_scheduler.rs index 759145c6..5df64de9 100644 --- a/src/scheduler/disk_read_scheduler.rs +++ b/src/scheduler/disk_read_scheduler.rs @@ -11,6 +11,7 @@ use crate::disk_store::interface::PartitionID; use crate::mem_store::partition::ColumnHandle; use crate::mem_store::partition::Partition; use crate::mem_store::*; +use crate::perf_counter::QueryPerfCounter; use crate::scheduler::inner_locustdb::InnerLocustDB; pub struct DiskReadScheduler { @@ -115,7 +116,7 @@ impl DiskReadScheduler { debug!("Scheduled sequential reads. Queue: {:#?}", &*task_queue); } - pub fn get_or_load(&self, handle: &ColumnHandle) -> Arc { + pub fn get_or_load(&self, handle: &ColumnHandle, perf_counter: &QueryPerfCounter) -> Arc { loop { if handle.is_resident() { let mut maybe_column = handle.try_get(); @@ -147,7 +148,7 @@ impl DiskReadScheduler { debug!("Point lookup for {}.{}", handle.name(), handle.id()); let columns = { let _token = self.reader_semaphore.access(); - self.disk_store.load_column(&handle.key().table, handle.id(), handle.name()) + self.disk_store.load_column(&handle.key().table, handle.id(), handle.name(), perf_counter) }; // Need to hold lock when we put new value into lru // TODO: also populate columns that were colocated in same subpartition diff --git a/src/scheduler/inner_locustdb.rs b/src/scheduler/inner_locustdb.rs index eba44696..a60ce2c5 100644 --- a/src/scheduler/inner_locustdb.rs +++ b/src/scheduler/inner_locustdb.rs @@ -374,9 +374,7 @@ impl InnerLocustDB { match ldb.lru.evict() { Some(victim) => { let tables = ldb.tables.read().unwrap(); - for t in tables.values() { - mem_usage_bytes -= t.evict(&victim); - } + mem_usage_bytes -= tables[&victim.table].evict(&victim); } None => { if ldb.opts.mem_size_limit_tables > 0 { @@ -410,6 +408,15 @@ impl InnerLocustDB { pub fn perf_counter(&self) -> &PerfCounter { self.perf_counter.as_ref() } + + pub(crate) fn evict_cache(&self) -> usize { + let tables = self.tables.read().unwrap(); + let mut bytes_evicted = 0; + while let Some(victim) = self.lru.evict() { + bytes_evicted += tables[&victim.table].evict(&victim); + } + bytes_evicted + } } impl Drop for InnerLocustDB { diff --git a/src/syntax/parser.rs b/src/syntax/parser.rs index 1ed8be52..4cff1a49 100644 --- a/src/syntax/parser.rs +++ b/src/syntax/parser.rs @@ -182,7 +182,7 @@ fn get_order_by(order_by: Option>) -> Result, fn get_limit(limit: Option) -> Result { match limit { Some(ASTNode::Value(Value::Number(int, _))) => Ok(int.parse::().unwrap()), - None => Ok(100), + None => Ok(u64::MAX), _ => Err(QueryError::NotImplemented(format!( "Invalid expression in limit clause: {:?}", limit