From 225aaee57251518afbef16626730791ed2da7571 Mon Sep 17 00:00:00 2001 From: Clemens Winter Date: Wed, 13 Mar 2024 00:56:15 -0700 Subject: [PATCH] fix bug caused by streaming stage with nonstreaming input --- src/engine/execution/executor.rs | 32 ++++++++++++++--- src/engine/operators/column_ops.rs | 58 ++++++++++++++++++++++-------- tests/query_tests.rs | 44 +++++++++++++++++++++++ 3 files changed, 115 insertions(+), 19 deletions(-) diff --git a/src/engine/execution/executor.rs b/src/engine/execution/executor.rs index e421369f..8d1dd4f0 100644 --- a/src/engine/execution/executor.rs +++ b/src/engine/execution/executor.rs @@ -160,20 +160,29 @@ impl<'a> QueryExecutor<'a> { // Disable streaming output for operators that have streaming + nonstreaming consumers let mut streaming_disabled = vec![false; self.ops.len()]; for (i, op) in self.ops.iter().enumerate() { + log::debug!("DETERMINING STREAMING FOR {}", op.display(true)); for output in op.outputs() { + log::debug!(" DETERMINING STREAMING FOR OUTPUT {}", output); if op.can_stream_output(output.i) { + log::debug!(" CHECKING STREAMABLE"); + // Are there any ops consuming this output that are streaming? let mut streaming = false; + // Are there any ops consuming this output that are not streaming? let mut block = false; + log::debug!(" CONSUMERS: {:?}", &consumers[output.i]); for &p in &consumers[output.i] { streaming |= self.ops[p].can_stream_input(output.i); block |= !self.ops[p].can_stream_input(output.i); + log::debug!(" CONSUMER: {} CAN STREAM INPUT: {}, STREAMING: {}, BLOCK: {}", self.ops[p].display(true), self.ops[p].can_stream_input(output.i), streaming, block); } streaming_disabled[i] = streaming & block; + log::debug!(" STREAMING: {}, BLOCK: {}, DISABLED: {}", streaming, block, streaming_disabled[i]); } } } // Group operators into stages + // This is done by taking an operator, and adding all operators that can be streamed to/from let mut visited = vec![false; self.ops.len()]; let mut dependencies_visited = vec![false; self.ops.len()]; let mut topo_pushed = vec![false; self.ops.len()]; @@ -202,6 +211,7 @@ impl<'a> QueryExecutor<'a> { while let Some(current) = to_visit.pop() { let op = &self.ops[current]; let current_stage = stages.len() as i32; + log::debug!("VISITING {} IN STAGE {}", op.display(true), current_stage); stage[current] = current_stage; ops.push(current); // Mark any new transitive inputs/outputs @@ -256,6 +266,7 @@ impl<'a> QueryExecutor<'a> { visited[p] = true; to_visit.push(p); + log::debug!(" ADDING STREAMING PRODUCER {} TO STAGE {}", self.ops[p].display(true), current_stage); stream = stream || self.ops[p].allocates(); } } @@ -271,7 +282,7 @@ impl<'a> QueryExecutor<'a> { continue; } // Including op in this stage would introduce a cycle if any of the - // inputs is produces by a transitive output to stage + // inputs is produced by a transitive output to stage for input in self.ops[consumer].inputs() { for &p in &producers[input.i] { if transitive_output[p] && stage[p] != current_stage { @@ -283,6 +294,7 @@ impl<'a> QueryExecutor<'a> { visited[consumer] = true; to_visit.push(consumer); + log::debug!(" ADDING STREAMING CONSUMER {} TO STAGE {}", self.ops[consumer].display(true), current_stage); stream = stream || self.ops[consumer].allocates(); } } @@ -305,6 +317,7 @@ impl<'a> QueryExecutor<'a> { visited[p] = true; to_visit.push(p); stream = stream || self.ops[p].allocates(); + log::debug!(" ADDING PREVIOUSLY CYCLE EXCLUDED STREAMING PRODUCER {} TO STAGE {}", self.ops[p].display(true), current_stage); } let ctr = std::mem::take(&mut consumers_to_revisit); 'l4: for consumer in ctr { @@ -318,6 +331,7 @@ impl<'a> QueryExecutor<'a> { } visited[consumer] = true; to_visit.push(consumer); + log::debug!(" ADDING PREVIOUSLY CYCLE EXCLUDED STREAMING CONSUMER {} TO STAGE {}", self.ops[consumer].display(true), current_stage); stream = stream || self.ops[consumer].allocates(); } } @@ -354,10 +368,12 @@ impl<'a> QueryExecutor<'a> { // Determine if stage/ops should be streaming let mut has_streaming_producer = false; + // TODO: should prevent group with nonstreaming input to be formed in first place? insert op that allows non-streaming input to be streamed? + let mut has_nonstreaming_input = false; let ops = total_order .into_iter() .map(|op| { - has_streaming_producer |= self.ops[op].is_streaming_producer(); + has_streaming_producer |= self.ops[op].is_streaming_producer() && !streaming_disabled[op]; let mut streaming_consumers = false; let mut block_consumers = false; for output in self.ops[op].outputs() { @@ -369,14 +385,22 @@ impl<'a> QueryExecutor<'a> { } } } + for input in self.ops[op].inputs() { + let hni = producers[input.i].iter().any(|&p| streaming_disabled[p]); + if hni { + log::debug!("{} has nonstreaming input {} produced by streaming disabled producer {}", self.ops[op].display(true), input.i, producers[input.i].iter().find(|&&p| streaming_disabled[p]).unwrap()); + } + has_nonstreaming_input |= hni; + } (op, streaming_consumers && !block_consumers) }) .collect::>(); + log::debug!("STAGE {} STREAMING: stream={} has_streaming_producer={} !has_nonstreaming_input={}", stages.len(), stream, has_streaming_producer, !has_nonstreaming_input); // TODO(#98): Make streaming possible for stages reading from temp results stages.push(ExecutorStage { ops, - stream: stream && has_streaming_producer, + stream: stream && has_streaming_producer && !has_nonstreaming_input, }) } @@ -526,7 +550,7 @@ impl<'a> QueryExecutor<'a> { has_more = false; for &(op, streamable) in &self.stages[stage].ops { if show && iters == 0 { - println!("{}", self.ops[op].display(true)); + println!("{} streamable={streamable}", self.ops[op].display(true)); } self.ops[op].execute(stream && streamable, scratchpad)?; if show && iters == 0 { diff --git a/src/engine/operators/column_ops.rs b/src/engine/operators/column_ops.rs index 7e8c1282..49fe2b14 100644 --- a/src/engine/operators/column_ops.rs +++ b/src/engine/operators/column_ops.rs @@ -13,22 +13,36 @@ pub struct ReadColumnData { } impl<'a> VecOperator<'a> for ReadColumnData { - fn execute(&mut self, streaming: bool, scratchpad: &mut Scratchpad<'a>) -> Result<(), QueryError> { + fn execute( + &mut self, + streaming: bool, + scratchpad: &mut Scratchpad<'a>, + ) -> Result<(), QueryError> { let data_section = scratchpad.get_column_data(&self.colname, self.section_index); if self.is_bitvec { // Basic sanity check, this will panic if the data is not u8 data_section.cast_ref_u8(); - assert!(self.current_index & 7 == 0, "Bitvec read must be aligned to byte boundary"); + assert!( + self.current_index & 7 == 0, + "Bitvec read must be aligned to byte boundary" + ); } - let end = if streaming { self.current_index + self.batch_size } else { data_section.len() }; - let result = if self.is_bitvec { - data_section.slice_box((self.current_index + 7) / 8, (end + 7) / 8) + let (from, to) = if streaming { + if self.is_bitvec { + ( + (self.current_index + 7) / 8, + (self.current_index + self.batch_size + 7) / 8, + ) + } else { + (self.current_index, self.current_index + self.batch_size) + } } else { - data_section.slice_box(self.current_index, end) + (0, data_section.len()) }; - self.current_index += self.batch_size; + let result = data_section.slice_box(from, to); scratchpad.set_any(self.output, result); - self.has_more = end < data_section.len(); + self.current_index += self.batch_size; + self.has_more = to < data_section.len(); Ok(()) } @@ -36,13 +50,27 @@ impl<'a> VecOperator<'a> for ReadColumnData { self.batch_size = batch_size; } - fn inputs(&self) -> Vec> { vec![] } - fn outputs(&self) -> Vec> { vec![self.output] } - fn can_stream_input(&self, _: usize) -> bool { false } - fn can_stream_output(&self, _: usize) -> bool { true } - fn allocates(&self) -> bool { false } - fn is_streaming_producer(&self) -> bool { true } - fn has_more(&self) -> bool { self.has_more } + fn inputs(&self) -> Vec> { + vec![] + } + fn outputs(&self) -> Vec> { + vec![self.output] + } + fn can_stream_input(&self, _: usize) -> bool { + false + } + fn can_stream_output(&self, _: usize) -> bool { + true + } + fn allocates(&self) -> bool { + false + } + fn is_streaming_producer(&self) -> bool { + true + } + fn has_more(&self) -> bool { + self.has_more + } fn display_op(&self, _: bool) -> String { format!("{:?}.{}", self.colname, self.section_index) diff --git a/tests/query_tests.rs b/tests/query_tests.rs index 1c792673..a9636017 100644 --- a/tests/query_tests.rs +++ b/tests/query_tests.rs @@ -1108,8 +1108,52 @@ fn test_long_nullable() { let expected_rows : Vec<[Value; 1]> = vec![]; let result = block_on(locustdb.run_query(query, true, true, vec![])).unwrap(); assert_eq!(result.unwrap().rows.unwrap(), expected_rows); + + let query = "SELECT nullable_int, count(1) FROM test;"; + let expected_rows = vec![ + [Null, Int(235917)], + [Int(-10), Int(13296)], + [Int(1), Int(12931)], + ]; + let result = block_on(locustdb.run_query(query, true, true, vec![])).unwrap(); + assert_eq!(result.unwrap().rows.unwrap(), expected_rows); + + locustdb.force_flush(); + let query = "SELECT nullable_int FROM test WHERE nullable_int IS NOT NULL;"; + let result = block_on(locustdb.run_query(query, true, true, vec![])).unwrap(); + assert_eq!(result.unwrap().rows.unwrap().len(), 26227); } +#[test] +fn test_sequential_int_sort() { + let _ = env_logger::try_init(); + let locustdb = LocustDB::memory_only(); + let _ = block_on(locustdb.gen_table(locustdb::colgen::GenTable { + name: "test".to_string(), + partitions: 1, + partition_size: 64, + columns: vec![( + "_step".to_string(), + locustdb::colgen::incrementing_int(), + )], + })); + let query = "SELECT _step FROM test WHERE _step IS NOT NULL ORDER BY _step;"; + let expected_rows : Vec<[Value; 1]> = vec![ + [Int(0)], + [Int(1)], + [Int(2)], + [Int(3)], + [Int(4)], + [Int(5)], + [Int(6)], + [Int(7)], + [Int(8)], + ]; + let result = block_on(locustdb.run_query(query, true, true, vec![0, 1, 2, 3])).unwrap(); + assert_eq!(result.unwrap().rows.unwrap()[0..9], expected_rows); +} + + #[test] fn test_group_by_string() { use crate::value_syntax::*;