Skip to content

Commit

Permalink
fix bug caused by streaming stage with nonstreaming input
Browse files Browse the repository at this point in the history
  • Loading branch information
cswinter committed Mar 13, 2024
1 parent e67a5da commit 225aaee
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 19 deletions.
32 changes: 28 additions & 4 deletions src/engine/execution/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,20 +160,29 @@ impl<'a> QueryExecutor<'a> {
// Disable streaming output for operators that have streaming + nonstreaming consumers
let mut streaming_disabled = vec![false; self.ops.len()];
for (i, op) in self.ops.iter().enumerate() {
log::debug!("DETERMINING STREAMING FOR {}", op.display(true));
for output in op.outputs() {
log::debug!(" DETERMINING STREAMING FOR OUTPUT {}", output);
if op.can_stream_output(output.i) {
log::debug!(" CHECKING STREAMABLE");
// Are there any ops consuming this output that are streaming?
let mut streaming = false;
// Are there any ops consuming this output that are not streaming?
let mut block = false;
log::debug!(" CONSUMERS: {:?}", &consumers[output.i]);
for &p in &consumers[output.i] {
streaming |= self.ops[p].can_stream_input(output.i);
block |= !self.ops[p].can_stream_input(output.i);
log::debug!(" CONSUMER: {} CAN STREAM INPUT: {}, STREAMING: {}, BLOCK: {}", self.ops[p].display(true), self.ops[p].can_stream_input(output.i), streaming, block);
}
streaming_disabled[i] = streaming & block;
log::debug!(" STREAMING: {}, BLOCK: {}, DISABLED: {}", streaming, block, streaming_disabled[i]);
}
}
}

// Group operators into stages
// This is done by taking an operator, and adding all operators that can be streamed to/from
let mut visited = vec![false; self.ops.len()];
let mut dependencies_visited = vec![false; self.ops.len()];
let mut topo_pushed = vec![false; self.ops.len()];
Expand Down Expand Up @@ -202,6 +211,7 @@ impl<'a> QueryExecutor<'a> {
while let Some(current) = to_visit.pop() {
let op = &self.ops[current];
let current_stage = stages.len() as i32;
log::debug!("VISITING {} IN STAGE {}", op.display(true), current_stage);
stage[current] = current_stage;
ops.push(current);
// Mark any new transitive inputs/outputs
Expand Down Expand Up @@ -256,6 +266,7 @@ impl<'a> QueryExecutor<'a> {

visited[p] = true;
to_visit.push(p);
log::debug!(" ADDING STREAMING PRODUCER {} TO STAGE {}", self.ops[p].display(true), current_stage);
stream = stream || self.ops[p].allocates();
}
}
Expand All @@ -271,7 +282,7 @@ impl<'a> QueryExecutor<'a> {
continue;
}
// Including op in this stage would introduce a cycle if any of the
// inputs is produces by a transitive output to stage
// inputs is produced by a transitive output to stage
for input in self.ops[consumer].inputs() {
for &p in &producers[input.i] {
if transitive_output[p] && stage[p] != current_stage {
Expand All @@ -283,6 +294,7 @@ impl<'a> QueryExecutor<'a> {

visited[consumer] = true;
to_visit.push(consumer);
log::debug!(" ADDING STREAMING CONSUMER {} TO STAGE {}", self.ops[consumer].display(true), current_stage);
stream = stream || self.ops[consumer].allocates();
}
}
Expand All @@ -305,6 +317,7 @@ impl<'a> QueryExecutor<'a> {
visited[p] = true;
to_visit.push(p);
stream = stream || self.ops[p].allocates();
log::debug!(" ADDING PREVIOUSLY CYCLE EXCLUDED STREAMING PRODUCER {} TO STAGE {}", self.ops[p].display(true), current_stage);
}
let ctr = std::mem::take(&mut consumers_to_revisit);
'l4: for consumer in ctr {
Expand All @@ -318,6 +331,7 @@ impl<'a> QueryExecutor<'a> {
}
visited[consumer] = true;
to_visit.push(consumer);
log::debug!(" ADDING PREVIOUSLY CYCLE EXCLUDED STREAMING CONSUMER {} TO STAGE {}", self.ops[consumer].display(true), current_stage);
stream = stream || self.ops[consumer].allocates();
}
}
Expand Down Expand Up @@ -354,10 +368,12 @@ impl<'a> QueryExecutor<'a> {

// Determine if stage/ops should be streaming
let mut has_streaming_producer = false;
// TODO: should prevent group with nonstreaming input to be formed in first place? insert op that allows non-streaming input to be streamed?
let mut has_nonstreaming_input = false;
let ops = total_order
.into_iter()
.map(|op| {
has_streaming_producer |= self.ops[op].is_streaming_producer();
has_streaming_producer |= self.ops[op].is_streaming_producer() && !streaming_disabled[op];
let mut streaming_consumers = false;
let mut block_consumers = false;
for output in self.ops[op].outputs() {
Expand All @@ -369,14 +385,22 @@ impl<'a> QueryExecutor<'a> {
}
}
}
for input in self.ops[op].inputs() {
let hni = producers[input.i].iter().any(|&p| streaming_disabled[p]);
if hni {
log::debug!("{} has nonstreaming input {} produced by streaming disabled producer {}", self.ops[op].display(true), input.i, producers[input.i].iter().find(|&&p| streaming_disabled[p]).unwrap());
}
has_nonstreaming_input |= hni;
}
(op, streaming_consumers && !block_consumers)
})
.collect::<Vec<_>>();

log::debug!("STAGE {} STREAMING: stream={} has_streaming_producer={} !has_nonstreaming_input={}", stages.len(), stream, has_streaming_producer, !has_nonstreaming_input);
// TODO(#98): Make streaming possible for stages reading from temp results
stages.push(ExecutorStage {
ops,
stream: stream && has_streaming_producer,
stream: stream && has_streaming_producer && !has_nonstreaming_input,
})
}

Expand Down Expand Up @@ -526,7 +550,7 @@ impl<'a> QueryExecutor<'a> {
has_more = false;
for &(op, streamable) in &self.stages[stage].ops {
if show && iters == 0 {
println!("{}", self.ops[op].display(true));
println!("{} streamable={streamable}", self.ops[op].display(true));
}
self.ops[op].execute(stream && streamable, scratchpad)?;
if show && iters == 0 {
Expand Down
58 changes: 43 additions & 15 deletions src/engine/operators/column_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,64 @@ pub struct ReadColumnData {
}

impl<'a> VecOperator<'a> for ReadColumnData {
fn execute(&mut self, streaming: bool, scratchpad: &mut Scratchpad<'a>) -> Result<(), QueryError> {
fn execute(
&mut self,
streaming: bool,
scratchpad: &mut Scratchpad<'a>,
) -> Result<(), QueryError> {
let data_section = scratchpad.get_column_data(&self.colname, self.section_index);
if self.is_bitvec {
// Basic sanity check, this will panic if the data is not u8
data_section.cast_ref_u8();
assert!(self.current_index & 7 == 0, "Bitvec read must be aligned to byte boundary");
assert!(
self.current_index & 7 == 0,
"Bitvec read must be aligned to byte boundary"
);
}
let end = if streaming { self.current_index + self.batch_size } else { data_section.len() };
let result = if self.is_bitvec {
data_section.slice_box((self.current_index + 7) / 8, (end + 7) / 8)
let (from, to) = if streaming {
if self.is_bitvec {
(
(self.current_index + 7) / 8,
(self.current_index + self.batch_size + 7) / 8,
)
} else {
(self.current_index, self.current_index + self.batch_size)
}
} else {
data_section.slice_box(self.current_index, end)
(0, data_section.len())
};
self.current_index += self.batch_size;
let result = data_section.slice_box(from, to);
scratchpad.set_any(self.output, result);
self.has_more = end < data_section.len();
self.current_index += self.batch_size;
self.has_more = to < data_section.len();
Ok(())
}

fn init(&mut self, _: usize, batch_size: usize, _: &mut Scratchpad<'a>) {
self.batch_size = batch_size;
}

fn inputs(&self) -> Vec<BufferRef<Any>> { vec![] }
fn outputs(&self) -> Vec<BufferRef<Any>> { vec![self.output] }
fn can_stream_input(&self, _: usize) -> bool { false }
fn can_stream_output(&self, _: usize) -> bool { true }
fn allocates(&self) -> bool { false }
fn is_streaming_producer(&self) -> bool { true }
fn has_more(&self) -> bool { self.has_more }
fn inputs(&self) -> Vec<BufferRef<Any>> {
vec![]
}
fn outputs(&self) -> Vec<BufferRef<Any>> {
vec![self.output]
}
fn can_stream_input(&self, _: usize) -> bool {
false
}
fn can_stream_output(&self, _: usize) -> bool {
true
}
fn allocates(&self) -> bool {
false
}
fn is_streaming_producer(&self) -> bool {
true
}
fn has_more(&self) -> bool {
self.has_more
}

fn display_op(&self, _: bool) -> String {
format!("{:?}.{}", self.colname, self.section_index)
Expand Down
44 changes: 44 additions & 0 deletions tests/query_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1108,8 +1108,52 @@ fn test_long_nullable() {
let expected_rows : Vec<[Value; 1]> = vec![];
let result = block_on(locustdb.run_query(query, true, true, vec![])).unwrap();
assert_eq!(result.unwrap().rows.unwrap(), expected_rows);

let query = "SELECT nullable_int, count(1) FROM test;";
let expected_rows = vec![
[Null, Int(235917)],
[Int(-10), Int(13296)],
[Int(1), Int(12931)],
];
let result = block_on(locustdb.run_query(query, true, true, vec![])).unwrap();
assert_eq!(result.unwrap().rows.unwrap(), expected_rows);

locustdb.force_flush();
let query = "SELECT nullable_int FROM test WHERE nullable_int IS NOT NULL;";
let result = block_on(locustdb.run_query(query, true, true, vec![])).unwrap();
assert_eq!(result.unwrap().rows.unwrap().len(), 26227);
}

#[test]
fn test_sequential_int_sort() {
let _ = env_logger::try_init();
let locustdb = LocustDB::memory_only();
let _ = block_on(locustdb.gen_table(locustdb::colgen::GenTable {
name: "test".to_string(),
partitions: 1,
partition_size: 64,
columns: vec![(
"_step".to_string(),
locustdb::colgen::incrementing_int(),
)],
}));
let query = "SELECT _step FROM test WHERE _step IS NOT NULL ORDER BY _step;";
let expected_rows : Vec<[Value; 1]> = vec![
[Int(0)],
[Int(1)],
[Int(2)],
[Int(3)],
[Int(4)],
[Int(5)],
[Int(6)],
[Int(7)],
[Int(8)],
];
let result = block_on(locustdb.run_query(query, true, true, vec![0, 1, 2, 3])).unwrap();
assert_eq!(result.unwrap().rows.unwrap()[0..9], expected_rows);
}


#[test]
fn test_group_by_string() {
use crate::value_syntax::*;
Expand Down

0 comments on commit 225aaee

Please sign in to comment.