Skip to content

Commit

Permalink
add queries to db_bench, measure query disk read
Browse files Browse the repository at this point in the history
  • Loading branch information
cswinter committed Feb 24, 2024
1 parent fcaff67 commit 18e605f
Show file tree
Hide file tree
Showing 12 changed files with 206 additions and 34 deletions.
114 changes: 108 additions & 6 deletions src/bin/db_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
use std::thread;
use std::time::Duration;

use locustdb::LocustDB;
use rand::{FromEntropy, Rng};
use structopt::StructOpt;
use tempfile::tempdir;
Expand Down Expand Up @@ -71,7 +72,7 @@ async fn main() {

let perf_counter = db.perf_counter();

log::info!("Done");
log::info!("Ingestion done");

// count number of files in db_path and all subdirectories
let file_count = walkdir::WalkDir::new(db_path.path())
Expand All @@ -87,6 +88,54 @@ async fn main() {
.map(|e| e.metadata().unwrap().len())
.sum::<u64>();

println!();
query(
&db,
"Querying 100 related columns in small table",
&format!(
"SELECT {} FROM {}",
(0..100)
.map(|c| format!("col_{c}"))
.collect::<Vec<String>>()
.join(", "),
small_tables[0]
),
)
.await;
query(
&db,
"Querying full small table",
&format!("SELECT * FROM {}", small_tables[1]),
)
.await;
query(
&db,
"Querying 100 random columns in small table",
&format!(
"SELECT {} FROM {}",
(0..100)
.map(|_| format!("col_{}", rng.gen_range(0u64, 1 << load_factor)))
.collect::<Vec<String>>()
.join(", "),
small_tables[2]
),
)
.await;
query(&db, "Querying 10 related columns in large table", "SELECT col_0, col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8, col_9 FROM event_log")
.await;
query(
&db,
"Querying 10 random columns in large table",
&format!(
"SELECT {} FROM event_log",
(0..10)
.map(|_| format!("col_{}", rng.gen_range(0u64, 1 << load_factor)))
.collect::<Vec<String>>()
.join(", ")
),
).await;

println!();
println!("elapsed: {:?}", start_time.elapsed());
println!(
"total uncompressed data: {}",
Expand Down Expand Up @@ -136,12 +185,30 @@ async fn main() {
" ingestion bytes: {}",
locustdb::unit_fmt::bite(perf_counter.network_read_ingestion_bytes() as usize)
);
println!("query");
println!(
" files opened: {}",
perf_counter.files_opened_partition(),
);
println!(
" disk read: {}",
locustdb::unit_fmt::bite(perf_counter.disk_read_partition_bytes() as usize)
);
}

// 1 large tables with 2^(2N) rows and 2^(2N) columns each
// let large_table = format!(
// "{}_{i}",
// random_word::gen(random_word::Lang::En).to_string();
// );
async fn query(db: &LocustDB, description: &str, query: &str) {
let evicted_bytes = db.evict_cache();
log::info!("Evicted {}", locustdb::unit_fmt::bite(evicted_bytes));
println!("{}", description);
let response = db.run_query(query, false, vec![]).await.unwrap().unwrap();
println!(
"Returned {} columns with {} rows in {:?} ({} files opened, {})",
response.rows.first().map(|r| r.len()).unwrap_or(0),
response.rows.len(),
Duration::from_nanos(response.stats.runtime_ns),
response.stats.files_opened,
locustdb::unit_fmt::bite(response.stats.disk_read_bytes as usize),
);
}

fn create_locustdb(db_path: PathBuf) -> Arc<locustdb::LocustDB> {
Expand Down Expand Up @@ -253,3 +320,38 @@ fn small_table_names(load_factor: u64) -> Vec<String> {
// network
// ingestion requests: 14
// ingestion bytes: 280MiB

// $ RUST_BACKTRACE=1 cargo run --bin db_bench -- --load-factor=8
// Querying 100 related columns in small table
// Returned 100 columns with 256 rows in 252.602ms (100 files opened, 24.0MiB)
// Querying full small table
// Returned 257 columns with 256 rows in 684.762302ms (257 files opened, 61.8MiB)
// Querying 100 random columns in small table
// Returned 100 columns with 256 rows in 195.1599ms (77 files opened, 18.5MiB)
// Querying 10 related columns in large table
// Returned 10 columns with 2048 rows in 610.756901ms (10 files opened, 62.1MiB)
// Querying 10 random columns in large table
// Returned 10 columns with 2048 rows in 605.867601ms (10 files opened, 68.4MiB)

// elapsed: 27.505325227s
// total uncompressed data: 256MiB
// total size on disk: 278MiB (SmallRng output is compressible)
// total files: 795
// total events: 73728
// disk writes
// total: 566MiB
// wal: 280MiB
// partition: 275MiB
// compaction: 0.000B
// meta store: 11.9MiB
// files created
// total: 813
// wal: 14
// partition: 794
// meta: 5
// network
// ingestion requests: 14
// ingestion bytes: 280MiB
// query
// files opened: 454
// disk read: 235MiB
3 changes: 2 additions & 1 deletion src/disk_store/interface.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use serde::{Deserialize, Serialize};

use crate::mem_store::column::Column;
use crate::perf_counter::QueryPerfCounter;
use crate::scheduler::inner_locustdb::InnerLocustDB;


pub trait ColumnLoader: Sync + Send + 'static {
fn load_column(&self, table_name: &str, partition: PartitionID, column_name: &str) -> Vec<Column>;
fn load_column(&self, table_name: &str, partition: PartitionID, column_name: &str, perf_counter: &QueryPerfCounter) -> Vec<Column>;
fn load_column_range(&self, start: PartitionID, end: PartitionID, column_name: &str, ldb: &InnerLocustDB);
}

Expand Down
3 changes: 2 additions & 1 deletion src/disk_store/noop_storage.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::mem_store::column::Column;
use crate::disk_store::interface::*;
use crate::perf_counter::QueryPerfCounter;
use crate::scheduler::inner_locustdb::InnerLocustDB;

pub struct NoopStorage;

impl ColumnLoader for NoopStorage {
fn load_column(&self, _: &str, _: PartitionID, _: &str) -> Vec<Column> {
fn load_column(&self, _: &str, _: PartitionID, _: &str, _: &QueryPerfCounter) -> Vec<Column> {
panic!("Can't load column from NoopStorage!")
}
fn load_column_range(&self, _: PartitionID, _: PartitionID, _: &str, _: &InnerLocustDB) {}
Expand Down
8 changes: 6 additions & 2 deletions src/disk_store/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize};
use super::interface::{ColumnLoader, PartitionMetadata};
use crate::logging_client::EventBuffer;
use crate::mem_store::Column;
use crate::perf_counter::PerfCounter;
use crate::perf_counter::{PerfCounter, QueryPerfCounter};

#[derive(Serialize, Deserialize)]
pub struct WALSegment<'a> {
Expand Down Expand Up @@ -39,6 +39,7 @@ pub trait Storage: Send + Sync + 'static {
partition: PartitionID,
table_name: &str,
column_name: &str,
perf_counter: &QueryPerfCounter,
) -> Vec<Column>;
}

Expand All @@ -48,8 +49,9 @@ impl ColumnLoader for StorageV2 {
table_name: &str,
partition: super::interface::PartitionID,
column_name: &str,
perf_counter: &QueryPerfCounter,
) -> Vec<Column> {
Storage::load_column(self, partition, table_name, column_name)
Storage::load_column(self, partition, table_name, column_name, perf_counter)
}

fn load_column_range(
Expand Down Expand Up @@ -257,6 +259,7 @@ impl Storage for StorageV2 {
partition: PartitionID,
table_name: &str,
column_name: &str,
perf_counter: &QueryPerfCounter,
) -> Vec<Column> {
// TODO: efficient access
let subpartition_key = self
Expand All @@ -279,6 +282,7 @@ impl Storage for StorageV2 {
.join(format!("{}_{}.part", partition, subpartition_key));
let data = self.writer.load(&path).unwrap();
self.perf_counter.disk_read_partition(data.len() as u64);
perf_counter.disk_read(data.len() as u64);
bincode::deserialize(&data).unwrap()
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/engine/data_types/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,10 @@ impl<'a, T: VecData<T> + 'a> Data<'a> for &'a [T] {
Box::new(&self[from..to])
}

fn append_all(&mut self, _other: &dyn Data<'a>, _count: usize) -> Option<BoxedData<'a>> {
panic!("append_all on borrow")
fn append_all(&mut self, other: &dyn Data<'a>, count: usize) -> Option<BoxedData<'a>> {
let mut owned = Vec::from(*self);
owned.append_all(other, count);
Some(Box::new(owned))
}

fn type_error(&self, func_name: &str) -> String {
Expand Down
36 changes: 21 additions & 15 deletions src/engine/execution/query_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ use std::mem;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Instant;

use serde::{Deserialize, Serialize};
use time::OffsetDateTime;

use crate::engine::*;
use crate::ingest::raw_val::RawVal;
use crate::mem_store::column::DataSource;
use crate::mem_store::partition::Partition;
use crate::perf_counter::QueryPerfCounter;
use crate::scheduler::disk_read_scheduler::DiskReadScheduler;
use crate::scheduler::*;
use crate::syntax::expression::*;
Expand All @@ -28,8 +29,9 @@ pub struct QueryTask {
partitions: Vec<Arc<Partition>>,
referenced_cols: HashSet<String>,
output_colnames: Vec<String>,
start_time_ns: i128,
start_time: Instant,
db: Arc<DiskReadScheduler>,
perf_counter: Arc<QueryPerfCounter>,

// Lifetime is not actually static, but tied to the lifetime of this struct.
// There is currently no good way to express this constraint in Rust.
Expand All @@ -45,7 +47,6 @@ pub struct QueryState<'a> {
completed_batches: usize,
partial_results: Vec<BatchResult<'a>>,
explains: Vec<String>,
rows_scanned: usize,
rows_collected: usize,
colstacks: Vec<Vec<HashMap<String, Arc<dyn DataSource>>>>,
}
Expand All @@ -61,7 +62,9 @@ pub struct QueryOutput {
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct QueryStats {
pub runtime_ns: u64,
pub rows_scanned: usize,
pub rows_scanned: u64,
pub files_opened: u64,
pub disk_read_bytes: u64,
}

impl QueryTask {
Expand All @@ -73,7 +76,7 @@ impl QueryTask {
db: Arc<DiskReadScheduler>,
sender: SharedSender<QueryResult>,
) -> Result<QueryTask, QueryError> {
let start_time_ns = OffsetDateTime::unix_epoch().unix_timestamp_nanos();
let start_time = Instant::now();
if query.is_select_star() {
query.select = find_all_cols(&source)
.into_iter()
Expand All @@ -100,14 +103,14 @@ impl QueryTask {
partitions: source,
referenced_cols,
output_colnames,
start_time_ns,
start_time,
db,
perf_counter: Arc::default(),

unsafe_state: Mutex::new(QueryState {
partial_results: Vec::new(),
completed_batches: 0,
explains: Vec::new(),
rows_scanned: 0,
rows_collected: 0,
colstacks: Vec::new(),
}),
Expand All @@ -123,8 +126,10 @@ impl QueryTask {
rows: vec![],
query_plans: Default::default(),
stats: QueryStats {
runtime_ns: 0,
runtime_ns: start_time.elapsed().as_nanos() as u64,
rows_scanned: 0,
files_opened: 0,
disk_read_bytes: 0,
},
}));
}
Expand All @@ -140,7 +145,7 @@ impl QueryTask {
let mut explains = Vec::new();
while let Some((partition, id)) = self.next_partition() {
let show = self.show.iter().any(|&x| x == id);
let cols = partition.get_cols(&self.referenced_cols, &self.db);
let cols = partition.get_cols(&self.referenced_cols, &self.db, self.perf_counter.as_ref());
rows_scanned += cols.iter().next().map_or(0, |c| c.1.len());
let unsafe_cols = unsafe {
mem::transmute::<
Expand Down Expand Up @@ -229,7 +234,7 @@ impl QueryTask {
}
state.completed_batches += result.batch_count;
state.explains.extend(explains);
state.rows_scanned += rows_scanned;
self.perf_counter.scanned(rows_scanned as u64);
state.rows_collected += rows_collected;

let result = unsafe { mem::transmute::<_, BatchResult<'static>>(result) };
Expand Down Expand Up @@ -266,9 +271,9 @@ impl QueryTask {
)
.unwrap()
.0;
self.convert_to_output_format(&full_result, state.rows_scanned, &state.explains)
self.convert_to_output_format(&full_result, &state.explains)
} else {
self.convert_to_output_format(&full_result, state.rows_scanned, &state.explains)
self.convert_to_output_format(&full_result, &state.explains)
};
self.sender.send(Ok(final_result));
self.completed.store(true, Ordering::SeqCst);
Expand Down Expand Up @@ -309,7 +314,6 @@ impl QueryTask {
fn convert_to_output_format(
&self,
full_result: &BatchResult,
rows_scanned: usize,
explains: &[String],
) -> QueryOutput {
let lo = self.final_pass.as_ref().map(|x| &x.limit).unwrap_or(&self.main_phase.limit);
Expand Down Expand Up @@ -339,8 +343,10 @@ impl QueryTask {
rows: result_rows,
query_plans,
stats: QueryStats {
runtime_ns: (OffsetDateTime::unix_epoch().unix_timestamp_nanos() - self.start_time_ns) as u64,
rows_scanned,
runtime_ns: self.start_time.elapsed().as_nanos() as u64,
rows_scanned: self.perf_counter.rows_scanned(),
files_opened: self.perf_counter.files_opened(),
disk_read_bytes: self.perf_counter.disk_read_bytes(),
},
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/locustdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ impl LocustDB {
pub fn force_flush(&self) {
self.inner_locustdb.wal_flush();
}

pub fn evict_cache(&self) -> usize {
self.inner_locustdb.evict_cache()
}
}

#[derive(Clone)]
Expand Down
Loading

0 comments on commit 18e605f

Please sign in to comment.