Skip to content

Commit

Permalink
Fix read of bitvec data sections
Browse files Browse the repository at this point in the history
  • Loading branch information
cswinter committed Mar 13, 2024
1 parent 22086ff commit 4d55981
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 29 deletions.
12 changes: 11 additions & 1 deletion src/engine/operators/column_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub struct ReadColumnData {
pub colname: String,
pub section_index: usize,
pub output: BufferRef<Any>,
pub is_bitvec: bool,

pub current_index: usize,
pub batch_size: usize,
Expand All @@ -14,8 +15,17 @@ pub struct ReadColumnData {
impl<'a> VecOperator<'a> for ReadColumnData {
fn execute(&mut self, streaming: bool, scratchpad: &mut Scratchpad<'a>) -> Result<(), QueryError> {
let data_section = scratchpad.get_column_data(&self.colname, self.section_index);
if self.is_bitvec {
// Basic sanity check, this will panic if the data is not u8
data_section.cast_ref_u8();
assert!(self.current_index & 7 == 0, "Bitvec read must be aligned to byte boundary");
}
let end = if streaming { self.current_index + self.batch_size } else { data_section.len() };
let result = data_section.slice_box(self.current_index, end);
let result = if self.is_bitvec {
data_section.slice_box((self.current_index + 7) / 8, (end + 7) / 8)
} else {
data_section.slice_box(self.current_index, end)
};
self.current_index += self.batch_size;
scratchpad.set_any(self.output, result);
self.has_more = end < data_section.len();
Expand Down
2 changes: 2 additions & 0 deletions src/engine/operators/vector_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,13 @@ pub mod operator {
colname: String,
section_index: usize,
output: BufferRef<Any>,
is_bitvec: bool,
) -> BoxedOperator<'a> {
Box::new(ReadColumnData {
colname,
section_index,
output,
is_bitvec,
batch_size: 0,
current_index: 0,
has_more: true,
Expand Down
6 changes: 4 additions & 2 deletions src/engine/planning/query_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub enum QueryPlan {
ColumnSection {
name: String,
section: usize,
is_bitvec: bool,
#[nohash]
range: Option<(i64, i64)>,
#[output(t = "base=provided")]
Expand Down Expand Up @@ -920,7 +921,7 @@ impl QueryPlan {
Ok(match *expr {
ColName(ref name) => match columns.get::<str>(name.as_ref()) {
Some(c) => {
let mut plan = planner.column_section(name, 0, c.range(), c.encoding_type());
let mut plan = planner.column_section(name, 0, false, c.range(), c.encoding_type());
let mut t = c.full_type();
if !c.codec().is_elementwise_decodable() {
let (codec, fixed_width) = c.codec().ensure_fixed_width(plan, planner);
Expand Down Expand Up @@ -1615,9 +1616,10 @@ pub(super) fn prepare<'a>(
QueryPlan::ColumnSection {
name,
section,
is_bitvec,
column_section,
..
} => operator::read_column_data(name, section, column_section.any()),
} => operator::read_column_data(name, section, column_section.any(), is_bitvec),
QueryPlan::AssembleNullable {
data,
present,
Expand Down
7 changes: 7 additions & 0 deletions src/ingest/colgen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ pub fn int_weighted(values: Vec<i64>, weights: Vec<f64>) -> Box<dyn ColumnGenera
})
}

pub fn nullable_ints(values: Vec<Option<i64>>, weights: Vec<f64>) -> Box<dyn ColumnGenerator> {
Box::new(Weighted {
elem: values,
weights,
})
}

pub fn incrementing_int() -> Box<dyn ColumnGenerator> {
Box::new(IncrementingInteger)
}
Expand Down
32 changes: 17 additions & 15 deletions src/mem_store/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,10 @@ impl Codec {
}
CodecOp::Delta(_) => planner.delta_decode(stack.pop().unwrap()).into(),
CodecOp::ToI64(_) => planner.cast(stack.pop().unwrap(), EncodingType::I64),
CodecOp::PushDataSection(section_index) => planner.column_section(
CodecOp::PushDataSection(section_index, is_bitvec) => planner.column_section(
&self.column_name,
section_index,
is_bitvec,
None,
self.section_types[section_index],
),
Expand Down Expand Up @@ -212,13 +213,13 @@ impl Codec {
planner: &mut QueryPlanner,
) -> BufferRef<Scalar<i64>> {
match self.ops[..] {
[CodecOp::PushDataSection(1), CodecOp::PushDataSection(2), CodecOp::DictLookup(_)] => {
[CodecOp::PushDataSection(1, false), CodecOp::PushDataSection(2, false), CodecOp::DictLookup(_)] => {
let offset_len = planner
.column_section(&self.column_name, 1, None, EncodingType::U64)
.column_section(&self.column_name, 1, false, None, EncodingType::U64)
.u64()
.unwrap();
let backing_store = planner
.column_section(&self.column_name, 2, None, EncodingType::U8)
.column_section(&self.column_name, 2, false, None, EncodingType::U8)
.u8()
.unwrap();
planner.inverse_dict_lookup(offset_len, backing_store, string_const)
Expand Down Expand Up @@ -309,7 +310,8 @@ pub enum CodecOp {
Add(EncodingType, i64),
Delta(EncodingType),
ToI64(EncodingType),
PushDataSection(usize),
// (section_index, is_bitvec)
PushDataSection(usize, bool),
DictLookup(EncodingType),
LZ4(EncodingType, usize),
UnpackStrings,
Expand Down Expand Up @@ -345,7 +347,7 @@ impl CodecOp {
CodecOp::LZ4(t, _) => *t,
CodecOp::UnpackStrings => EncodingType::Str,
CodecOp::UnhexpackStrings(_, _) => EncodingType::Str,
CodecOp::PushDataSection(i) => section_types[*i],
CodecOp::PushDataSection(i, _) => section_types[*i],
CodecOp::Unknown => panic!("Unknown.output_type()"),
};
type_stack.push(t);
Expand All @@ -359,7 +361,7 @@ impl CodecOp {
CodecOp::Add(_, x) => *x == 0,
CodecOp::Delta(_) => false,
CodecOp::ToI64(_) => true,
CodecOp::PushDataSection(_) => true,
CodecOp::PushDataSection(_, _) => true,
CodecOp::DictLookup(_) => false,
CodecOp::LZ4(_, _) => false,
CodecOp::UnpackStrings => false,
Expand All @@ -374,7 +376,7 @@ impl CodecOp {
CodecOp::Add(_, _) => true,
CodecOp::Delta(_) => false,
CodecOp::ToI64(_) => true,
CodecOp::PushDataSection(_) => true,
CodecOp::PushDataSection(_, _) => true,
CodecOp::DictLookup(_) => true,
CodecOp::LZ4(_, _) => false,
CodecOp::UnpackStrings => false,
Expand All @@ -390,7 +392,7 @@ impl CodecOp {
CodecOp::Add(_, _) => true,
CodecOp::Delta(_) => false,
CodecOp::ToI64(_) => true,
CodecOp::PushDataSection(_) => true,
CodecOp::PushDataSection(_, _) => true,
CodecOp::DictLookup(_) => true,
CodecOp::LZ4(_, _) => false,
CodecOp::UnpackStrings => false,
Expand All @@ -405,7 +407,7 @@ impl CodecOp {
CodecOp::Add(_, _) => 1,
CodecOp::Delta(_) => 1,
CodecOp::ToI64(_) => 1,
CodecOp::PushDataSection(_) => 0,
CodecOp::PushDataSection(_, _) => 0,
CodecOp::DictLookup(_) => 3,
CodecOp::LZ4(_, _) => 1,
CodecOp::UnpackStrings => 1,
Expand All @@ -426,7 +428,7 @@ impl CodecOp {
}
CodecOp::Delta(t) => format!("Delta({:?})", t),
CodecOp::ToI64(t) => format!("ToI64({:?})", t),
CodecOp::PushDataSection(i) => format!("Data({})", i),
CodecOp::PushDataSection(i, is_bitvec) => format!("Data({}{})", i, if *is_bitvec { " bitvec" } else { "" }),
CodecOp::DictLookup(t) => format!("Dict({:?})", t),
CodecOp::LZ4(t, decoded_len) => {
if alternate {
Expand All @@ -450,9 +452,9 @@ mod tests {
fn test_ensure_property() {
let codec = vec![
CodecOp::LZ4(EncodingType::U16, 20),
CodecOp::PushDataSection(1),
CodecOp::PushDataSection(1, false),
CodecOp::LZ4(EncodingType::U64, 1),
CodecOp::PushDataSection(2),
CodecOp::PushDataSection(2, false),
CodecOp::LZ4(EncodingType::U8, 3),
CodecOp::DictLookup(EncodingType::U16),
];
Expand All @@ -465,9 +467,9 @@ mod tests {
assert_eq!(
rest,
vec![
CodecOp::PushDataSection(1),
CodecOp::PushDataSection(1, false),
CodecOp::LZ4(EncodingType::U64, 1),
CodecOp::PushDataSection(2),
CodecOp::PushDataSection(2, false),
CodecOp::LZ4(EncodingType::U8, 3),
CodecOp::DictLookup(EncodingType::U16),
]
Expand Down
2 changes: 1 addition & 1 deletion src/mem_store/floats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl FloatColumn {
name,
values.len(),
None,
vec![CodecOp::PushDataSection(1), CodecOp::Nullable],
vec![CodecOp::PushDataSection(1, true), CodecOp::Nullable],
vec![values.into(), present.into()],
),
None => Column::new(
Expand Down
12 changes: 6 additions & 6 deletions src/mem_store/integers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ impl IntegerColumn {
name,
values.len(),
original_range,
vec![CodecOp::Delta(EncodingType::I64), CodecOp::PushDataSection(1), CodecOp::Nullable],
vec![CodecOp::Delta(EncodingType::I64), CodecOp::PushDataSection(1, true), CodecOp::Nullable],
vec![values.into(), present.into()])
} else {
Column::new(
name,
values.len(),
original_range,
vec![CodecOp::PushDataSection(1), CodecOp::Nullable],
vec![CodecOp::PushDataSection(1, true), CodecOp::Nullable],
vec![values.into(), present.into()])
}
None => if delta_encode {
Expand Down Expand Up @@ -101,10 +101,10 @@ impl IntegerColumn {
let len = values.len();
let codec = if null_map.is_some() {
match (offset == 0, delta_encode) {
(true, true) => vec![CodecOp::Delta(t), CodecOp::PushDataSection(1), CodecOp::Nullable],
(true, false) => vec![CodecOp::PushDataSection(1), CodecOp::Nullable, CodecOp::ToI64(t)],
(false, true) => vec![CodecOp::Add(t, offset), CodecOp::Delta(EncodingType::I64), CodecOp::PushDataSection(1), CodecOp::Nullable],
(false, false) => vec![CodecOp::PushDataSection(1), CodecOp::Nullable, CodecOp::Add(t, offset)],
(true, true) => vec![CodecOp::Delta(t), CodecOp::PushDataSection(1, true), CodecOp::Nullable],
(true, false) => vec![CodecOp::PushDataSection(1, true), CodecOp::Nullable, CodecOp::ToI64(t)],
(false, true) => vec![CodecOp::Add(t, offset), CodecOp::Delta(EncodingType::I64), CodecOp::PushDataSection(1, true), CodecOp::Nullable],
(false, false) => vec![CodecOp::PushDataSection(1, true), CodecOp::Nullable, CodecOp::Add(t, offset)],
}
} else {
match (offset == 0, delta_encode) {
Expand Down
8 changes: 4 additions & 4 deletions src/mem_store/strings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
(string_pack_codec(), DataSection::U8(packed.into_vec()))
};
let mut column = if let Some(present) = present {
codec.push(CodecOp::PushDataSection(1));
codec.push(CodecOp::PushDataSection(1, true));
codec.push(CodecOp::Nullable);
Column::new(name, len, None, codec, vec![data, DataSection::U8(present)])
} else {
Expand Down Expand Up @@ -120,7 +120,7 @@ where
)
};
if let Some(present) = present {
codec.insert(0, CodecOp::PushDataSection(3));
codec.insert(0, CodecOp::PushDataSection(3, true));
codec.insert(1, CodecOp::Nullable);
data_sections.push(DataSection::U8(present));
}
Expand All @@ -131,8 +131,8 @@ where

pub fn dict_codec(index_type: EncodingType) -> Vec<CodecOp> {
vec![
CodecOp::PushDataSection(1),
CodecOp::PushDataSection(2),
CodecOp::PushDataSection(1, false),
CodecOp::PushDataSection(2, false),
CodecOp::DictLookup(index_type),
]
}
Expand Down
26 changes: 26 additions & 0 deletions tests/query_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1084,6 +1084,32 @@ fn test_column_with_null_partitions() {
);
}

#[test]
fn test_long_nullable() {
let _ = env_logger::try_init();
let locustdb = LocustDB::memory_only();
let _ = block_on(locustdb.gen_table(locustdb::colgen::GenTable {
name: "test".to_string(),
partitions: 8,
partition_size: 2 << 14,
columns: vec![(
"nullable_int".to_string(),
locustdb::colgen::nullable_ints(
vec![
None,
Some(1),
Some(-10),
],
vec![0.9, 0.05, 0.05],
),
)],
}));
let query = "SELECT nullable_int FROM test LIMIT 0;";
let expected_rows : Vec<[Value; 1]> = vec![];
let result = block_on(locustdb.run_query(query, true, true, vec![])).unwrap();
assert_eq!(result.unwrap().rows.unwrap(), expected_rows);
}

#[test]
fn test_group_by_string() {
use crate::value_syntax::*;
Expand Down

0 comments on commit 4d55981

Please sign in to comment.