Skip to content

Commit

Permalink
preserve row ordering during compaction, tons of bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
cswinter committed Mar 19, 2024
1 parent cdf14ff commit 565863b
Show file tree
Hide file tree
Showing 23 changed files with 402 additions and 159 deletions.
1 change: 1 addition & 0 deletions locustdb-derive/src/reify_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ fn types(t: &Ident) -> Option<Vec<Type>> {
"Primitive" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64, Type::Str, Type::OptStr, Type::OptF64]),
"PrimitiveOrVal" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64, Type::Str, Type::OptStr, Type::OptF64, Type::Val]),
"VecData" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64, Type::USize, Type::Str, Type::OptStr, Type::OptF64, Type::Val, Type::Bitvec]),
"VecDataNoU64" => Some(vec![Type::U8, Type::U16, Type::U32, Type::I64, Type::F64, Type::USize, Type::Str, Type::OptStr, Type::OptF64, Type::Val, Type::Bitvec]),
"NullablePrimitive" => Some(vec![Type::NullableU8, Type::NullableU16, Type::NullableU32, Type::NullableI64, Type::NullableF64, Type::NullableStr]),
"PrimitiveUSize" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64, Type::Str, Type::USize]),
"PrimitiveNoU64" => Some(vec![Type::U8, Type::U16, Type::U32, Type::I64, Type::F64, Type::Str]),
Expand Down
1 change: 1 addition & 0 deletions src/bin/repl/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ fn main() {
max_partition_size_bytes,
partition_combine_factor: 4,
batch_size,
max_partition_length: 1024 * 1024,
};

if options.readahead > options.mem_size_limit_tables {
Expand Down
1 change: 1 addition & 0 deletions src/disk_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub type PartitionID = u64;
pub struct PartitionMetadata {
pub id: PartitionID,
pub tablename: String,
pub offset: usize,
pub len: usize,
pub subpartitions: Vec<SubpartitionMetadata>,
}
Expand Down
6 changes: 4 additions & 2 deletions src/disk_store/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
use super::file_writer::{BlobWriter, FileBlobWriter};
use super::{ColumnLoader, PartitionMetadata, SubpartitionMetadata};
use crate::logging_client::EventBuffer;
use crate::mem_store::Column;
use crate::mem_store::{Column, DataSource};
use crate::perf_counter::{PerfCounter, QueryPerfCounter};

#[derive(Serialize, Deserialize)]
Expand Down Expand Up @@ -236,14 +236,16 @@ impl Storage {
metadata: Vec<SubpartitionMetadata>,
subpartitions: Vec<Vec<Arc<Column>>>,
old_partitions: &[PartitionID],
offset: usize,
) {
log::debug!("compacting {} parititions into {} for table {}", old_partitions.len(), id, table);

// Persist new partition files
let partition = PartitionMetadata {
id,
tablename: table.to_string(),
len: subpartitions[0].len(),
len: subpartitions[0][0].len(),
offset,
subpartitions: metadata,
};
self.write_subpartitions(&partition, subpartitions);
Expand Down
33 changes: 33 additions & 0 deletions src/engine/data_types/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,24 +556,41 @@ impl<'a, T: VecData<T> + 'a> Data<'a> for &'a [T] {
default fn cast_ref_opt_f64(&self) -> &[Option<OrderedFloat<f64>>] {
panic!("{}", self.type_error("cast_ref_opt_f64"))
}
default fn to_mixed(&self) -> Vec<Val<'a>> {
panic!("{}", self.type_error("to_mixed"))
}
}

impl<'a> Data<'a> for &'a [&'a str] {
fn cast_ref_str(&self) -> &[&'a str] {
self
}
fn to_mixed(&self) -> Vec<Val<'a>> {
self.iter().map(|s| Val::Str(s)).collect()
}
}

impl<'a> Data<'a> for &'a [Option<&'a str>] {
fn cast_ref_opt_str(&self) -> &[Option<&'a str>] {
self
}
fn to_mixed(&self) -> Vec<Val<'a>> {
self.iter()
.map(|s| match s {
None => Val::Null,
Some(s) => Val::Str(s),
})
.collect()
}
}

impl<'a> Data<'a> for &'a [Val<'a>] {
fn cast_ref_mixed(&self) -> &[Val<'a>] {
self
}
fn to_mixed(&self) -> Vec<Val<'a>> {
self.to_vec()
}
}

impl<'a> Data<'a> for &'a [usize] {
Expand All @@ -586,6 +603,9 @@ impl<'a> Data<'a> for &'a [i64] {
fn cast_ref_i64(&self) -> &[i64] {
self
}
fn to_mixed(&self) -> Vec<Val<'a>> {
self.iter().map(|i| Val::Integer(*i)).collect()
}
}

impl<'a> Data<'a> for &'a [u64] {
Expand Down Expand Up @@ -616,12 +636,25 @@ impl<'a> Data<'a> for &'a [OrderedFloat<f64>] {
fn cast_ref_f64(&self) -> &[OrderedFloat<f64>] {
self
}

fn to_mixed(&self) -> Vec<Val<'a>> {
self.iter().map(|i| Val::Float(*i)).collect()
}
}

impl<'a> Data<'a> for &'a [Option<OrderedFloat<f64>>] {
fn cast_ref_opt_f64(&self) -> &[Option<OrderedFloat<f64>>] {
self
}

fn to_mixed(&self) -> Vec<Val<'a>> {
self.iter()
.map(|s| match s {
None => Val::Null,
Some(s) => Val::Float(*s),
})
.collect()
}
}

impl<'a> Data<'a> for &'a [MergeOp] {
Expand Down
53 changes: 33 additions & 20 deletions src/engine/execution/batch_merging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::errors::QueryError;
use crate::mem_store::column::DataSource;
use std::cmp::min;
use std::collections::HashMap;
use std::ops::Range;
use std::result::Result;
use std::sync::Arc;
use std::usize;
Expand All @@ -14,6 +15,7 @@ pub struct BatchResult<'a> {
pub aggregations: Vec<(usize, Aggregator)>,
pub order_by: Vec<(usize, bool)>,
pub level: u32,
pub scanned_range: Range<usize>,
pub batch_count: usize,
pub show: bool,
// Buffers that are referenced by query result - unsafe to drop before results are converted into owned values
Expand Down Expand Up @@ -68,6 +70,12 @@ pub fn combine<'a>(
limit: usize,
batch_size: usize,
) -> Result<BatchResult<'a>, QueryError> {
ensure!(
batch1.scanned_range.end == batch2.scanned_range.start,
"Scanned ranges do not match in left ({:?}) and right ({:?}) batch result.",
batch1.scanned_range,
batch2.scanned_range,
);
ensure!(
batch1.projection.len() == batch2.projection.len(),
"Unequal number of projections in left ({}) and right ({}) batch result.",
Expand Down Expand Up @@ -168,6 +176,7 @@ pub fn combine<'a>(
aggregations,
order_by: vec![],
level: batch1.level + 1,
scanned_range: batch1.scanned_range.start..batch2.scanned_range.end,
batch_count: batch1.batch_count + batch2.batch_count,
show: batch1.show && batch2.show,
unsafe_referenced_buffers: {
Expand Down Expand Up @@ -207,24 +216,16 @@ pub fn combine<'a>(
let (merge_ops, merged_final_sort_col) = if batch1.order_by.len() == 1 {
let (index1, desc) = batch1.order_by[0];
let (index2, _) = batch2.order_by[0];
let mut left = left[index1];
let mut right = right[index2];
if left.tag == EncodingType::Null {
left = qp.cast(left, EncodingType::Val);
}
if right.tag == EncodingType::Null {
right = qp.cast(right, EncodingType::Val);
}
let left = null_to_val(&mut qp, left[index1]);
let right = null_to_val(&mut qp, right[index2]);
let (left, right) = unify_types(&mut qp, left, right);
qp.merge(left, right, limit, desc)
} else {
let (first_sort_col_index1, desc) = batch1.order_by[0];
let (first_sort_col_index2, _) = batch2.order_by[0];
let (l, r) = unify_types(
&mut qp,
left[first_sort_col_index1],
right[first_sort_col_index2],
);
let l = null_to_val(&mut qp, left[first_sort_col_index1]);
let r = null_to_val(&mut qp, right[first_sort_col_index2]);
let (l, r) = unify_types(&mut qp, l, r);
let mut partitioning = qp.partition(l, r, limit, desc);

for i in 1..(left.len() - 1) {
Expand All @@ -233,11 +234,9 @@ pub fn combine<'a>(
let (l, r) = unify_types(&mut qp, left[index1], right[index2]);
partitioning = qp.subpartition(partitioning, l, r, desc);
}
let (l, r) = unify_types(
&mut qp,
left[final_sort_col_index1],
right[final_sort_col_index2],
);
let l = null_to_val(&mut qp, left[final_sort_col_index1]);
let r = null_to_val(&mut qp, right[final_sort_col_index2]);
let (l, r) = unify_types(&mut qp, l, r);
qp.merge_partitioned(partitioning, l, r, limit, batch1.order_by.last().unwrap().1)
};

Expand All @@ -246,7 +245,9 @@ pub fn combine<'a>(
if ileft == final_sort_col_index1 && iright == final_sort_col_index2 {
projection.push(merged_final_sort_col.any());
} else {
let (l, r) = unify_types(&mut qp, left[ileft], right[iright]);
let l = null_to_val(&mut qp, left[ileft]);
let r = null_to_val(&mut qp, right[iright]);
let (l, r) = unify_types(&mut qp, l, r);
let merged = qp.merge_keep(merge_ops, l, r);
projection.push(merged.any());
}
Expand All @@ -256,7 +257,9 @@ pub fn combine<'a>(
.iter()
.zip(batch2.order_by.iter())
{
let (l, r) = unify_types(&mut qp, left[ileft], right[iright]);
let l = null_to_val(&mut qp, left[ileft]);
let r = null_to_val(&mut qp, right[iright]);
let (l, r) = unify_types(&mut qp, l, r);
let merged = qp.merge_keep(merge_ops, l, r);
order_by.push((merged.any(), desc));
}
Expand All @@ -276,6 +279,7 @@ pub fn combine<'a>(
level: batch1.level + 1,
batch_count: batch1.batch_count + batch2.batch_count,
show: batch1.show && batch2.show,
scanned_range: batch1.scanned_range.start..batch2.scanned_range.end,
unsafe_referenced_buffers: {
let mut urb = batch1.unsafe_referenced_buffers;
urb.extend(batch2.unsafe_referenced_buffers);
Expand Down Expand Up @@ -322,6 +326,7 @@ pub fn combine<'a>(
aggregations: vec![],
order_by: vec![],
level: batch1.level + 1,
scanned_range: batch1.scanned_range.start..batch2.scanned_range.end,
batch_count: batch1.batch_count + batch2.batch_count,
show: batch1.show && batch2.show,
unsafe_referenced_buffers: {
Expand All @@ -348,3 +353,11 @@ fn unify_types(
}
(left, right)
}

fn null_to_val(qp: &mut QueryPlanner, plan: TypedBufferRef) -> TypedBufferRef {
if plan.tag == EncodingType::Null {
qp.cast(plan, EncodingType::Val)
} else {
plan
}
}
2 changes: 2 additions & 0 deletions src/engine/execution/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ impl TypedBufferRef {

pub fn is_nullable(&self) -> bool { self.tag.is_nullable() }

pub fn is_null(&self) -> bool { self.tag == EncodingType::Null }

pub fn is_constant(&self) -> bool { self.tag.is_constant() }

pub fn nullable_any(&self) -> Result<BufferRef<Nullable<Any>>, QueryError> {
Expand Down
Loading

0 comments on commit 565863b

Please sign in to comment.