Skip to content

Commit

Permalink
Test with small batch size and fix streaming bug
Browse files Browse the repository at this point in the history
  • Loading branch information
cswinter committed Mar 16, 2024
1 parent f03dbba commit 33b84db
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 64 deletions.
5 changes: 5 additions & 0 deletions src/bin/repl/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ struct Opt {
/// Address to bind the server to
#[structopt(long, default_value = "127.0.0.1:8080")]
addrs: String,

/// Maximum length of temporary buffer used in streaming stages during query execution
batch_size: usize,
}

fn main() {
Expand All @@ -118,6 +121,7 @@ fn main() {
cors_allow_all,
cors_allow_origin,
addrs,
batch_size,
} = Opt::from_args();

let options = locustdb::Options {
Expand All @@ -131,6 +135,7 @@ fn main() {
max_wal_size_bytes,
max_partition_size_bytes,
partition_combine_factor: 4,
batch_size,
};

if options.readahead > options.mem_size_limit_tables {
Expand Down
5 changes: 3 additions & 2 deletions src/engine/execution/batch_merging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub fn combine<'a>(
batch1: BatchResult<'a>,
batch2: BatchResult<'a>,
limit: usize,
batch_size: usize,
) -> Result<BatchResult<'a>, QueryError> {
ensure!(
batch1.projection.len() == batch2.projection.len(),
Expand Down Expand Up @@ -155,7 +156,7 @@ pub fn combine<'a>(
aggregates.push((aggregated.any(), aggregator));
}

let mut executor = qp.prepare(data)?;
let mut executor = qp.prepare(data, batch_size)?;
let mut results = executor.prepare_no_columns();
executor.run(1, &mut results, batch1.show || batch2.show)?;

Expand Down Expand Up @@ -261,7 +262,7 @@ pub fn combine<'a>(
}
order_by.push((merged_final_sort_col.any(), final_desc));

let mut executor = qp.prepare(data)?;
let mut executor = qp.prepare(data, batch_size)?;
let mut results = executor.prepare_no_columns();
executor.run(1, &mut results, batch1.show || batch2.show)?;
let (columns, projection, _, order_by) =
Expand Down
72 changes: 51 additions & 21 deletions src/engine/execution/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub struct QueryExecutor<'a> {
count: usize,
last_buffer: TypedBufferRef,
shared_buffers: HashMap<&'static str, TypedBufferRef>,
batch_size: usize,
}

#[derive(Default, Clone)]
Expand All @@ -22,6 +23,17 @@ struct ExecutorStage {
}

impl<'a> QueryExecutor<'a> {
pub fn new(batch_size: usize) -> QueryExecutor<'a> {
QueryExecutor {
ops: vec![],
stages: vec![],
count: 0,
last_buffer: TypedBufferRef::new(error_buffer_ref("ERROR"), EncodingType::Null),
shared_buffers: HashMap::default(),
batch_size,
}
}

pub fn set_buffer_count(&mut self, count: usize) {
self.count = count
}
Expand Down Expand Up @@ -173,10 +185,21 @@ impl<'a> QueryExecutor<'a> {
for &p in &consumers[output.i] {
streaming |= self.ops[p].can_stream_input(output.i);
block |= !self.ops[p].can_stream_input(output.i);
log::debug!(" CONSUMER: {} CAN STREAM INPUT: {}, STREAMING: {}, BLOCK: {}", self.ops[p].display(true), self.ops[p].can_stream_input(output.i), streaming, block);
log::debug!(
" CONSUMER: {} CAN STREAM INPUT: {}, STREAMING: {}, BLOCK: {}",
self.ops[p].display(true),
self.ops[p].can_stream_input(output.i),
streaming,
block
);
}
streaming_disabled[i] = streaming & block;
log::debug!(" STREAMING: {}, BLOCK: {}, DISABLED: {}", streaming, block, streaming_disabled[i]);
log::debug!(
" STREAMING: {}, BLOCK: {}, DISABLED: {}",
streaming,
block,
streaming_disabled[i]
);
}
}
}
Expand Down Expand Up @@ -266,7 +289,11 @@ impl<'a> QueryExecutor<'a> {

visited[p] = true;
to_visit.push(p);
log::debug!(" ADDING STREAMING PRODUCER {} TO STAGE {}", self.ops[p].display(true), current_stage);
log::debug!(
" ADDING STREAMING PRODUCER {} TO STAGE {}",
self.ops[p].display(true),
current_stage
);
stream = stream || self.ops[p].allocates();
}
}
Expand Down Expand Up @@ -294,7 +321,11 @@ impl<'a> QueryExecutor<'a> {

visited[consumer] = true;
to_visit.push(consumer);
log::debug!(" ADDING STREAMING CONSUMER {} TO STAGE {}", self.ops[consumer].display(true), current_stage);
log::debug!(
" ADDING STREAMING CONSUMER {} TO STAGE {}",
self.ops[consumer].display(true),
current_stage
);
stream = stream || self.ops[consumer].allocates();
}
}
Expand All @@ -317,7 +348,11 @@ impl<'a> QueryExecutor<'a> {
visited[p] = true;
to_visit.push(p);
stream = stream || self.ops[p].allocates();
log::debug!(" ADDING PREVIOUSLY CYCLE EXCLUDED STREAMING PRODUCER {} TO STAGE {}", self.ops[p].display(true), current_stage);
log::debug!(
" ADDING PREVIOUSLY CYCLE EXCLUDED STREAMING PRODUCER {} TO STAGE {}",
self.ops[p].display(true),
current_stage
);
}
let ctr = std::mem::take(&mut consumers_to_revisit);
'l4: for consumer in ctr {
Expand All @@ -331,7 +366,11 @@ impl<'a> QueryExecutor<'a> {
}
visited[consumer] = true;
to_visit.push(consumer);
log::debug!(" ADDING PREVIOUSLY CYCLE EXCLUDED STREAMING CONSUMER {} TO STAGE {}", self.ops[consumer].display(true), current_stage);
log::debug!(
" ADDING PREVIOUSLY CYCLE EXCLUDED STREAMING CONSUMER {} TO STAGE {}",
self.ops[consumer].display(true),
current_stage
);
stream = stream || self.ops[consumer].allocates();
}
}
Expand Down Expand Up @@ -386,9 +425,12 @@ impl<'a> QueryExecutor<'a> {
}
}
for input in self.ops[op].inputs() {
let hni = producers[input.i].iter().any(|&p| streaming_disabled[p]);
let hni = producers[input.i].iter().any(|&p| streaming_disabled[p] || !self.ops[p].can_stream_output(input.i));
if hni {
log::debug!("{} has nonstreaming input {} produced by streaming disabled producer {}", self.ops[op].display(true), input.i, producers[input.i].iter().find(|&&p| streaming_disabled[p]).unwrap());
log::debug!(
"{} has nonstreaming input {} produced by streaming disabled producer {}",
self.ops[op].display(true), input.i, producers[input.i].iter().find(|&&p| streaming_disabled[p] || !self.ops[p].can_stream_output(input.i)).unwrap()
);
}
has_nonstreaming_input |= hni;
}
Expand Down Expand Up @@ -513,7 +555,7 @@ impl<'a> QueryExecutor<'a> {
max_input_length = column_length;
}
let batch_size = if self.stages[stage].stream {
1024
self.batch_size
} else {
max_input_length
};
Expand Down Expand Up @@ -584,18 +626,6 @@ impl<'a> QueryExecutor<'a> {
}
}

impl<'a> Default for QueryExecutor<'a> {
fn default() -> QueryExecutor<'a> {
QueryExecutor {
ops: vec![],
stages: vec![],
count: 0,
last_buffer: TypedBufferRef::new(error_buffer_ref("ERROR"), EncodingType::Null),
shared_buffers: HashMap::default(),
}
}
}

impl<'a> fmt::Display for QueryExecutor<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let alternate = f.alternate();
Expand Down
33 changes: 24 additions & 9 deletions src/engine/execution/query_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct QueryTask {
start_time: Instant,
db: Arc<DiskReadScheduler>,
perf_counter: Arc<QueryPerfCounter>,
batch_size: usize,

// Lifetime is not actually static, but tied to the lifetime of this struct.
// There is currently no good way to express this constraint in Rust.
Expand Down Expand Up @@ -81,6 +82,7 @@ pub struct QueryStats {
}

impl QueryTask {
#[allow(clippy::too_many_arguments)]
pub fn new(
mut query: Query,
rowformat: bool,
Expand All @@ -89,6 +91,7 @@ impl QueryTask {
source: Vec<Arc<Partition>>,
db: Arc<DiskReadScheduler>,
sender: SharedSender<QueryResult>,
batch_size: usize,
) -> Result<QueryTask, QueryError> {
let start_time = Instant::now();
if query.is_select_star() {
Expand Down Expand Up @@ -121,6 +124,7 @@ impl QueryTask {
start_time,
db,
perf_counter: Arc::default(),
batch_size,

unsafe_state: Mutex::new(QueryState {
partial_results: Vec::new(),
Expand Down Expand Up @@ -171,11 +175,17 @@ impl QueryTask {
>(&cols)
};
let (mut batch_result, explain) = match if self.main_phase.aggregate.is_empty() {
self.main_phase
.run(unsafe_cols, self.explain, show, id, partition.len())
self.main_phase.run(
unsafe_cols,
self.explain,
show,
id,
partition.len(),
self.batch_size,
)
} else {
self.main_phase
.run_aggregate(unsafe_cols, self.explain, show, id, partition.len())
.run_aggregate(unsafe_cols, self.explain, show, id, partition.len(), self.batch_size)
} {
Ok(result) => result,
Err(error) => {
Expand All @@ -192,7 +202,7 @@ impl QueryTask {
// Merge only with previous batch results of same level to get O(n log n) complexity
while let Some(br) = batch_results.pop() {
if br.level == batch_result.level {
match combine(br, batch_result, self.combined_limit()) {
match combine(br, batch_result, self.combined_limit(), self.batch_size) {
Ok(result) => batch_result = result,
Err(error) => {
self.fail_with(error);
Expand All @@ -214,7 +224,7 @@ impl QueryTask {
}
}

match QueryTask::combine_results(batch_results, self.combined_limit()) {
match QueryTask::combine_results(batch_results, self.combined_limit(), self.batch_size) {
Ok(Some(result)) => self.push_result(result, rows_scanned, rows_collected, explains),
Err(error) => self.fail_with(error),
_ => {}
Expand All @@ -226,11 +236,12 @@ impl QueryTask {
fn combine_results(
batch_results: Vec<BatchResult>,
limit: usize,
batch_size: usize,
) -> Result<Option<BatchResult>, QueryError> {
let mut full_result = None;
for batch_result in batch_results {
if let Some(partial) = full_result {
full_result = Some(combine(partial, batch_result, limit)?);
full_result = Some(combine(partial, batch_result, limit, batch_size)?);
} else {
full_result = Some(batch_result);
}
Expand Down Expand Up @@ -262,8 +273,11 @@ impl QueryTask {
{
let mut owned_results = Vec::with_capacity(0);
mem::swap(&mut owned_results, &mut state.partial_results);
let full_result = match QueryTask::combine_results(owned_results, self.combined_limit())
{
let full_result = match QueryTask::combine_results(
owned_results,
self.combined_limit(),
self.batch_size,
) {
Ok(result) => result.unwrap(),
Err(error) => {
self.fail_with_no_lock(error);
Expand All @@ -285,6 +299,7 @@ impl QueryTask {
!self.show.is_empty(),
0xdead_beef,
cols.iter().next().map(|(_, c)| c.len()).unwrap_or(0),
self.batch_size,
)
.unwrap()
.0;
Expand Down Expand Up @@ -451,7 +466,7 @@ impl BasicTypeColumn {
| EncodingType::NullableU32
| EncodingType::NullableU64
| EncodingType::NullableF64
| EncodingType::OptStr
| EncodingType::OptStr
| EncodingType::OptF64 => {
let mut vals = vec![];
for i in 0..data.len() {
Expand Down
4 changes: 2 additions & 2 deletions src/engine/planning/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ pub struct QueryPlanner {
}

impl QueryPlanner {
pub fn prepare<'a>(&mut self, mut constant_vecs: Vec<BoxedData<'a>>) -> Result<QueryExecutor<'a>, QueryError> {
pub fn prepare<'a>(&mut self, mut constant_vecs: Vec<BoxedData<'a>>, batch_size: usize) -> Result<QueryExecutor<'a>, QueryError> {
self.perform_rewrites();

let mut result = QueryExecutor::default();
let mut result = QueryExecutor::new(batch_size);
result.set_buffer_count(self.buffer_provider.buffer_count());
for operation in &self.operations {
prepare(operation.clone(), &mut constant_vecs, &mut result)?;
Expand Down
6 changes: 4 additions & 2 deletions src/engine/planning/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl NormalFormQuery {
show: bool,
partition: usize,
partition_len: usize,
batch_size: usize,
) -> Result<(BatchResult<'a>, Option<String>), QueryError> {
let limit = (self.limit.limit + self.limit.offset) as usize;
let mut planner = QueryPlanner::default();
Expand Down Expand Up @@ -138,7 +139,7 @@ impl NormalFormQuery {
for c in columns {
debug!("{}: {:?}", partition, c);
}
let mut executor = planner.prepare(vec![])?;
let mut executor = planner.prepare(vec![], batch_size)?;
let mut results = executor.prepare(NormalFormQuery::column_data(columns));
debug!("{:#}", &executor);
executor.run(partition_len, &mut results, show)?;
Expand Down Expand Up @@ -171,6 +172,7 @@ impl NormalFormQuery {
show: bool,
partition: usize,
partition_len: usize,
batch_size: usize,
) -> Result<(BatchResult<'a>, Option<String>), QueryError> {
let mut qp = QueryPlanner::default();

Expand Down Expand Up @@ -370,7 +372,7 @@ impl NormalFormQuery {
for c in columns {
debug!("{}: {:?}", partition, c);
}
let mut executor = qp.prepare(vec![])?;
let mut executor = qp.prepare(vec![], batch_size)?;
let mut results = executor.prepare(NormalFormQuery::column_data(columns));
debug!("{:#}", &executor);
executor.run(partition_len, &mut results, show)?;
Expand Down
Loading

0 comments on commit 33b84db

Please sign in to comment.