From 33b84dbb3105e015799f488c5321052bec38f047 Mon Sep 17 00:00:00 2001 From: Clemens Winter Date: Sat, 16 Mar 2024 10:31:03 -0700 Subject: [PATCH] Test with small batch size and fix streaming bug --- src/bin/repl/main.rs | 5 ++ src/engine/execution/batch_merging.rs | 5 +- src/engine/execution/executor.rs | 72 +++++++++++++++++++-------- src/engine/execution/query_task.rs | 33 ++++++++---- src/engine/planning/planner.rs | 4 +- src/engine/planning/query.rs | 6 ++- src/locustdb.rs | 23 +++++++++ src/scheduler/inner_locustdb.rs | 1 + tests/query_tests.rs | 65 +++++++++++++----------- 9 files changed, 150 insertions(+), 64 deletions(-) diff --git a/src/bin/repl/main.rs b/src/bin/repl/main.rs index 223dffb5..1a7d2479 100644 --- a/src/bin/repl/main.rs +++ b/src/bin/repl/main.rs @@ -94,6 +94,9 @@ struct Opt { /// Address to bind the server to #[structopt(long, default_value = "127.0.0.1:8080")] addrs: String, + + /// Maximum length of temporary buffer used in streaming stages during query execution + batch_size: usize, } fn main() { @@ -118,6 +121,7 @@ fn main() { cors_allow_all, cors_allow_origin, addrs, + batch_size, } = Opt::from_args(); let options = locustdb::Options { @@ -131,6 +135,7 @@ fn main() { max_wal_size_bytes, max_partition_size_bytes, partition_combine_factor: 4, + batch_size, }; if options.readahead > options.mem_size_limit_tables { diff --git a/src/engine/execution/batch_merging.rs b/src/engine/execution/batch_merging.rs index 71391eb1..23c7d4cc 100644 --- a/src/engine/execution/batch_merging.rs +++ b/src/engine/execution/batch_merging.rs @@ -66,6 +66,7 @@ pub fn combine<'a>( batch1: BatchResult<'a>, batch2: BatchResult<'a>, limit: usize, + batch_size: usize, ) -> Result, QueryError> { ensure!( batch1.projection.len() == batch2.projection.len(), @@ -155,7 +156,7 @@ pub fn combine<'a>( aggregates.push((aggregated.any(), aggregator)); } - let mut executor = qp.prepare(data)?; + let mut executor = qp.prepare(data, batch_size)?; let mut results = executor.prepare_no_columns(); executor.run(1, &mut results, batch1.show || batch2.show)?; @@ -261,7 +262,7 @@ pub fn combine<'a>( } order_by.push((merged_final_sort_col.any(), final_desc)); - let mut executor = qp.prepare(data)?; + let mut executor = qp.prepare(data, batch_size)?; let mut results = executor.prepare_no_columns(); executor.run(1, &mut results, batch1.show || batch2.show)?; let (columns, projection, _, order_by) = diff --git a/src/engine/execution/executor.rs b/src/engine/execution/executor.rs index 8d1dd4f0..552d1269 100644 --- a/src/engine/execution/executor.rs +++ b/src/engine/execution/executor.rs @@ -12,6 +12,7 @@ pub struct QueryExecutor<'a> { count: usize, last_buffer: TypedBufferRef, shared_buffers: HashMap<&'static str, TypedBufferRef>, + batch_size: usize, } #[derive(Default, Clone)] @@ -22,6 +23,17 @@ struct ExecutorStage { } impl<'a> QueryExecutor<'a> { + pub fn new(batch_size: usize) -> QueryExecutor<'a> { + QueryExecutor { + ops: vec![], + stages: vec![], + count: 0, + last_buffer: TypedBufferRef::new(error_buffer_ref("ERROR"), EncodingType::Null), + shared_buffers: HashMap::default(), + batch_size, + } + } + pub fn set_buffer_count(&mut self, count: usize) { self.count = count } @@ -173,10 +185,21 @@ impl<'a> QueryExecutor<'a> { for &p in &consumers[output.i] { streaming |= self.ops[p].can_stream_input(output.i); block |= !self.ops[p].can_stream_input(output.i); - log::debug!(" CONSUMER: {} CAN STREAM INPUT: {}, STREAMING: {}, BLOCK: {}", self.ops[p].display(true), self.ops[p].can_stream_input(output.i), streaming, block); + log::debug!( + " CONSUMER: {} CAN STREAM INPUT: {}, STREAMING: {}, BLOCK: {}", + self.ops[p].display(true), + self.ops[p].can_stream_input(output.i), + streaming, + block + ); } streaming_disabled[i] = streaming & block; - log::debug!(" STREAMING: {}, BLOCK: {}, DISABLED: {}", streaming, block, streaming_disabled[i]); + log::debug!( + " STREAMING: {}, BLOCK: {}, DISABLED: {}", + streaming, + block, + streaming_disabled[i] + ); } } } @@ -266,7 +289,11 @@ impl<'a> QueryExecutor<'a> { visited[p] = true; to_visit.push(p); - log::debug!(" ADDING STREAMING PRODUCER {} TO STAGE {}", self.ops[p].display(true), current_stage); + log::debug!( + " ADDING STREAMING PRODUCER {} TO STAGE {}", + self.ops[p].display(true), + current_stage + ); stream = stream || self.ops[p].allocates(); } } @@ -294,7 +321,11 @@ impl<'a> QueryExecutor<'a> { visited[consumer] = true; to_visit.push(consumer); - log::debug!(" ADDING STREAMING CONSUMER {} TO STAGE {}", self.ops[consumer].display(true), current_stage); + log::debug!( + " ADDING STREAMING CONSUMER {} TO STAGE {}", + self.ops[consumer].display(true), + current_stage + ); stream = stream || self.ops[consumer].allocates(); } } @@ -317,7 +348,11 @@ impl<'a> QueryExecutor<'a> { visited[p] = true; to_visit.push(p); stream = stream || self.ops[p].allocates(); - log::debug!(" ADDING PREVIOUSLY CYCLE EXCLUDED STREAMING PRODUCER {} TO STAGE {}", self.ops[p].display(true), current_stage); + log::debug!( + " ADDING PREVIOUSLY CYCLE EXCLUDED STREAMING PRODUCER {} TO STAGE {}", + self.ops[p].display(true), + current_stage + ); } let ctr = std::mem::take(&mut consumers_to_revisit); 'l4: for consumer in ctr { @@ -331,7 +366,11 @@ impl<'a> QueryExecutor<'a> { } visited[consumer] = true; to_visit.push(consumer); - log::debug!(" ADDING PREVIOUSLY CYCLE EXCLUDED STREAMING CONSUMER {} TO STAGE {}", self.ops[consumer].display(true), current_stage); + log::debug!( + " ADDING PREVIOUSLY CYCLE EXCLUDED STREAMING CONSUMER {} TO STAGE {}", + self.ops[consumer].display(true), + current_stage + ); stream = stream || self.ops[consumer].allocates(); } } @@ -386,9 +425,12 @@ impl<'a> QueryExecutor<'a> { } } for input in self.ops[op].inputs() { - let hni = producers[input.i].iter().any(|&p| streaming_disabled[p]); + let hni = producers[input.i].iter().any(|&p| streaming_disabled[p] || !self.ops[p].can_stream_output(input.i)); if hni { - log::debug!("{} has nonstreaming input {} produced by streaming disabled producer {}", self.ops[op].display(true), input.i, producers[input.i].iter().find(|&&p| streaming_disabled[p]).unwrap()); + log::debug!( + "{} has nonstreaming input {} produced by streaming disabled producer {}", + self.ops[op].display(true), input.i, producers[input.i].iter().find(|&&p| streaming_disabled[p] || !self.ops[p].can_stream_output(input.i)).unwrap() + ); } has_nonstreaming_input |= hni; } @@ -513,7 +555,7 @@ impl<'a> QueryExecutor<'a> { max_input_length = column_length; } let batch_size = if self.stages[stage].stream { - 1024 + self.batch_size } else { max_input_length }; @@ -584,18 +626,6 @@ impl<'a> QueryExecutor<'a> { } } -impl<'a> Default for QueryExecutor<'a> { - fn default() -> QueryExecutor<'a> { - QueryExecutor { - ops: vec![], - stages: vec![], - count: 0, - last_buffer: TypedBufferRef::new(error_buffer_ref("ERROR"), EncodingType::Null), - shared_buffers: HashMap::default(), - } - } -} - impl<'a> fmt::Display for QueryExecutor<'a> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let alternate = f.alternate(); diff --git a/src/engine/execution/query_task.rs b/src/engine/execution/query_task.rs index 60dc83c4..9c8f1e38 100644 --- a/src/engine/execution/query_task.rs +++ b/src/engine/execution/query_task.rs @@ -33,6 +33,7 @@ pub struct QueryTask { start_time: Instant, db: Arc, perf_counter: Arc, + batch_size: usize, // Lifetime is not actually static, but tied to the lifetime of this struct. // There is currently no good way to express this constraint in Rust. @@ -81,6 +82,7 @@ pub struct QueryStats { } impl QueryTask { + #[allow(clippy::too_many_arguments)] pub fn new( mut query: Query, rowformat: bool, @@ -89,6 +91,7 @@ impl QueryTask { source: Vec>, db: Arc, sender: SharedSender, + batch_size: usize, ) -> Result { let start_time = Instant::now(); if query.is_select_star() { @@ -121,6 +124,7 @@ impl QueryTask { start_time, db, perf_counter: Arc::default(), + batch_size, unsafe_state: Mutex::new(QueryState { partial_results: Vec::new(), @@ -171,11 +175,17 @@ impl QueryTask { >(&cols) }; let (mut batch_result, explain) = match if self.main_phase.aggregate.is_empty() { - self.main_phase - .run(unsafe_cols, self.explain, show, id, partition.len()) + self.main_phase.run( + unsafe_cols, + self.explain, + show, + id, + partition.len(), + self.batch_size, + ) } else { self.main_phase - .run_aggregate(unsafe_cols, self.explain, show, id, partition.len()) + .run_aggregate(unsafe_cols, self.explain, show, id, partition.len(), self.batch_size) } { Ok(result) => result, Err(error) => { @@ -192,7 +202,7 @@ impl QueryTask { // Merge only with previous batch results of same level to get O(n log n) complexity while let Some(br) = batch_results.pop() { if br.level == batch_result.level { - match combine(br, batch_result, self.combined_limit()) { + match combine(br, batch_result, self.combined_limit(), self.batch_size) { Ok(result) => batch_result = result, Err(error) => { self.fail_with(error); @@ -214,7 +224,7 @@ impl QueryTask { } } - match QueryTask::combine_results(batch_results, self.combined_limit()) { + match QueryTask::combine_results(batch_results, self.combined_limit(), self.batch_size) { Ok(Some(result)) => self.push_result(result, rows_scanned, rows_collected, explains), Err(error) => self.fail_with(error), _ => {} @@ -226,11 +236,12 @@ impl QueryTask { fn combine_results( batch_results: Vec, limit: usize, + batch_size: usize, ) -> Result, QueryError> { let mut full_result = None; for batch_result in batch_results { if let Some(partial) = full_result { - full_result = Some(combine(partial, batch_result, limit)?); + full_result = Some(combine(partial, batch_result, limit, batch_size)?); } else { full_result = Some(batch_result); } @@ -262,8 +273,11 @@ impl QueryTask { { let mut owned_results = Vec::with_capacity(0); mem::swap(&mut owned_results, &mut state.partial_results); - let full_result = match QueryTask::combine_results(owned_results, self.combined_limit()) - { + let full_result = match QueryTask::combine_results( + owned_results, + self.combined_limit(), + self.batch_size, + ) { Ok(result) => result.unwrap(), Err(error) => { self.fail_with_no_lock(error); @@ -285,6 +299,7 @@ impl QueryTask { !self.show.is_empty(), 0xdead_beef, cols.iter().next().map(|(_, c)| c.len()).unwrap_or(0), + self.batch_size, ) .unwrap() .0; @@ -451,7 +466,7 @@ impl BasicTypeColumn { | EncodingType::NullableU32 | EncodingType::NullableU64 | EncodingType::NullableF64 - | EncodingType::OptStr + | EncodingType::OptStr | EncodingType::OptF64 => { let mut vals = vec![]; for i in 0..data.len() { diff --git a/src/engine/planning/planner.rs b/src/engine/planning/planner.rs index 803e743c..d1ebe476 100644 --- a/src/engine/planning/planner.rs +++ b/src/engine/planning/planner.rs @@ -19,10 +19,10 @@ pub struct QueryPlanner { } impl QueryPlanner { - pub fn prepare<'a>(&mut self, mut constant_vecs: Vec>) -> Result, QueryError> { + pub fn prepare<'a>(&mut self, mut constant_vecs: Vec>, batch_size: usize) -> Result, QueryError> { self.perform_rewrites(); - let mut result = QueryExecutor::default(); + let mut result = QueryExecutor::new(batch_size); result.set_buffer_count(self.buffer_provider.buffer_count()); for operation in &self.operations { prepare(operation.clone(), &mut constant_vecs, &mut result)?; diff --git a/src/engine/planning/query.rs b/src/engine/planning/query.rs index f8732e46..9474a650 100644 --- a/src/engine/planning/query.rs +++ b/src/engine/planning/query.rs @@ -46,6 +46,7 @@ impl NormalFormQuery { show: bool, partition: usize, partition_len: usize, + batch_size: usize, ) -> Result<(BatchResult<'a>, Option), QueryError> { let limit = (self.limit.limit + self.limit.offset) as usize; let mut planner = QueryPlanner::default(); @@ -138,7 +139,7 @@ impl NormalFormQuery { for c in columns { debug!("{}: {:?}", partition, c); } - let mut executor = planner.prepare(vec![])?; + let mut executor = planner.prepare(vec![], batch_size)?; let mut results = executor.prepare(NormalFormQuery::column_data(columns)); debug!("{:#}", &executor); executor.run(partition_len, &mut results, show)?; @@ -171,6 +172,7 @@ impl NormalFormQuery { show: bool, partition: usize, partition_len: usize, + batch_size: usize, ) -> Result<(BatchResult<'a>, Option), QueryError> { let mut qp = QueryPlanner::default(); @@ -370,7 +372,7 @@ impl NormalFormQuery { for c in columns { debug!("{}: {:?}", partition, c); } - let mut executor = qp.prepare(vec![])?; + let mut executor = qp.prepare(vec![], batch_size)?; let mut results = executor.prepare(NormalFormQuery::column_data(columns)); debug!("{:#}", &executor); executor.run(partition_len, &mut results, show)?; diff --git a/src/locustdb.rs b/src/locustdb.rs index 886dabd2..2683acad 100644 --- a/src/locustdb.rs +++ b/src/locustdb.rs @@ -27,6 +27,7 @@ impl LocustDB { } pub fn new(opts: &Options) -> LocustDB { + opts.validate().expect("Invalid options"); let locustdb = Arc::new(InnerLocustDB::new(opts)); InnerLocustDB::start_worker_threads(&locustdb); LocustDB { @@ -81,6 +82,7 @@ impl LocustDB { data, self.inner_locustdb.disk_read_scheduler().clone(), SharedSender::new(sender), + self.inner_locustdb.opts().batch_size, ); match query_task { @@ -202,6 +204,8 @@ pub struct Options { pub max_partition_size_bytes: u64, /// Combine partitions when the size of every original partition is less than this factor of the combined partition size pub partition_combine_factor: u64, + /// Maximum length of temporary buffer used in streaming stages during query execution + pub batch_size: usize, } impl Default for Options { @@ -217,10 +221,29 @@ impl Default for Options { max_wal_size_bytes: 64 * 1024 * 1024, // 64 MiB max_partition_size_bytes: 8 * 1024 * 1024, // 8 MiB partition_combine_factor: 4, + batch_size: 1024, } } } +impl Options { + fn validate(&self) -> Result<(), String> { + if self.threads == 0 { + return Err("threads must be greater than 0".to_string()); + } + if self.read_threads == 0 { + return Err("read_threads must be greater than 0".to_string()); + } + if self.partition_combine_factor == 0 { + return Err("partition_combine_factor must be greater than 0".to_string()); + } + if self.batch_size % 8 != 0 { + return Err("batch_size must be a multiple of 8".to_string()); + } + Ok(()) + } +} + impl Drop for LocustDB { fn drop(&mut self) { self.inner_locustdb.stop(); diff --git a/src/scheduler/inner_locustdb.rs b/src/scheduler/inner_locustdb.rs index 5877ab7c..24cb9b17 100644 --- a/src/scheduler/inner_locustdb.rs +++ b/src/scheduler/inner_locustdb.rs @@ -269,6 +269,7 @@ impl InnerLocustDB { data.clone(), self.disk_read_scheduler().clone(), SharedSender::new(sender), + self.opts.batch_size, ) .unwrap(); self.schedule(query_task); diff --git a/tests/query_tests.rs b/tests/query_tests.rs index 23fe6371..f551d843 100644 --- a/tests/query_tests.rs +++ b/tests/query_tests.rs @@ -30,35 +30,44 @@ fn test_query(query: &str, expected_rows: &[Vec]) { fn test_query_ec(query: &str, expected_rows: &[Vec]) { let _ = env_logger::try_init(); - #[allow(unused_mut)] - let mut opts = Options::default(); - if env::var("DEBUG_TESTS").is_ok() { - opts.threads = 1; - } - let locustdb = LocustDB::new(&opts); - let _ = block_on( - locustdb.load_csv( - LoadOptions::new("test_data/edge_cases.csv", "default") - .with_partition_size(3) - .allow_nulls_all_columns(), - ), - ); - let result1; - let result2 = if env::var("DEBUG_TESTS").is_ok() { - result1 = block_on(locustdb.run_query(query, false, true, vec![0, 1, 2, 3])).unwrap(); - locustdb.force_flush(); - block_on(locustdb.run_query(query, false, true, vec![0, 1, 2, 3])).unwrap() - } else { - result1 = block_on(locustdb.run_query(query, false, true, vec![])).unwrap(); + let optss = [ + Options::default(), + Options { + batch_size: 8, + ..Options::default() + }, + ]; + + for mut opts in optss { + if env::var("DEBUG_TESTS").is_ok() { + opts.threads = 1; + } + let locustdb = LocustDB::new(&opts); + let _ = block_on( + locustdb.load_csv( + LoadOptions::new("test_data/edge_cases.csv", "default") + .with_partition_size(3) + .allow_nulls_all_columns(), + ), + ); + + let show = if env::var("DEBUG_TESTS").is_ok() { + vec![0, 1, 2, 3] + } else { + vec![] + }; + + let result1 = block_on(locustdb.run_query(query, false, true, show.clone())).unwrap(); locustdb.force_flush(); - block_on(locustdb.run_query(query, false, true, vec![])).unwrap() - }; - assert_eq!( - result1.as_ref().unwrap().rows, - result2.as_ref().unwrap().rows, - "Query results differ after flush" - ); - assert_eq!(result1.unwrap().rows.unwrap(), expected_rows); + let result2 = block_on(locustdb.run_query(query, false, true, show)).unwrap(); + + assert_eq!( + result1.as_ref().unwrap().rows, + result2.as_ref().unwrap().rows, + "Query results differ after flush" + ); + assert_eq!(result1.unwrap().rows.unwrap(), expected_rows); + } } fn test_query_ec_err(query: &str, _expected_err: QueryError) {