diff --git a/src/engine/operators/column_ops.rs b/src/engine/operators/column_ops.rs index 6da80c59..7e8c1282 100644 --- a/src/engine/operators/column_ops.rs +++ b/src/engine/operators/column_ops.rs @@ -5,6 +5,7 @@ pub struct ReadColumnData { pub colname: String, pub section_index: usize, pub output: BufferRef, + pub is_bitvec: bool, pub current_index: usize, pub batch_size: usize, @@ -14,8 +15,17 @@ pub struct ReadColumnData { impl<'a> VecOperator<'a> for ReadColumnData { fn execute(&mut self, streaming: bool, scratchpad: &mut Scratchpad<'a>) -> Result<(), QueryError> { let data_section = scratchpad.get_column_data(&self.colname, self.section_index); + if self.is_bitvec { + // Basic sanity check, this will panic if the data is not u8 + data_section.cast_ref_u8(); + assert!(self.current_index & 7 == 0, "Bitvec read must be aligned to byte boundary"); + } let end = if streaming { self.current_index + self.batch_size } else { data_section.len() }; - let result = data_section.slice_box(self.current_index, end); + let result = if self.is_bitvec { + data_section.slice_box((self.current_index + 7) / 8, (end + 7) / 8) + } else { + data_section.slice_box(self.current_index, end) + }; self.current_index += self.batch_size; scratchpad.set_any(self.output, result); self.has_more = end < data_section.len(); diff --git a/src/engine/operators/vector_operator.rs b/src/engine/operators/vector_operator.rs index 80ead58d..bc52cd05 100644 --- a/src/engine/operators/vector_operator.rs +++ b/src/engine/operators/vector_operator.rs @@ -134,11 +134,13 @@ pub mod operator { colname: String, section_index: usize, output: BufferRef, + is_bitvec: bool, ) -> BoxedOperator<'a> { Box::new(ReadColumnData { colname, section_index, output, + is_bitvec, batch_size: 0, current_index: 0, has_more: true, diff --git a/src/engine/planning/query_plan.rs b/src/engine/planning/query_plan.rs index ce73ad7f..34eac675 100644 --- a/src/engine/planning/query_plan.rs +++ b/src/engine/planning/query_plan.rs @@ -23,6 +23,7 @@ pub enum QueryPlan { ColumnSection { name: String, section: usize, + is_bitvec: bool, #[nohash] range: Option<(i64, i64)>, #[output(t = "base=provided")] @@ -920,7 +921,7 @@ impl QueryPlan { Ok(match *expr { ColName(ref name) => match columns.get::(name.as_ref()) { Some(c) => { - let mut plan = planner.column_section(name, 0, c.range(), c.encoding_type()); + let mut plan = planner.column_section(name, 0, false, c.range(), c.encoding_type()); let mut t = c.full_type(); if !c.codec().is_elementwise_decodable() { let (codec, fixed_width) = c.codec().ensure_fixed_width(plan, planner); @@ -1615,9 +1616,10 @@ pub(super) fn prepare<'a>( QueryPlan::ColumnSection { name, section, + is_bitvec, column_section, .. - } => operator::read_column_data(name, section, column_section.any()), + } => operator::read_column_data(name, section, column_section.any(), is_bitvec), QueryPlan::AssembleNullable { data, present, diff --git a/src/ingest/colgen.rs b/src/ingest/colgen.rs index db75c7f9..bbeaaee3 100644 --- a/src/ingest/colgen.rs +++ b/src/ingest/colgen.rs @@ -40,6 +40,13 @@ pub fn int_weighted(values: Vec, weights: Vec) -> Box>, weights: Vec) -> Box { + Box::new(Weighted { + elem: values, + weights, + }) +} + pub fn incrementing_int() -> Box { Box::new(IncrementingInteger) } diff --git a/src/mem_store/codec.rs b/src/mem_store/codec.rs index ed2a219b..dad18694 100644 --- a/src/mem_store/codec.rs +++ b/src/mem_store/codec.rs @@ -133,9 +133,10 @@ impl Codec { } CodecOp::Delta(_) => planner.delta_decode(stack.pop().unwrap()).into(), CodecOp::ToI64(_) => planner.cast(stack.pop().unwrap(), EncodingType::I64), - CodecOp::PushDataSection(section_index) => planner.column_section( + CodecOp::PushDataSection(section_index, is_bitvec) => planner.column_section( &self.column_name, section_index, + is_bitvec, None, self.section_types[section_index], ), @@ -212,13 +213,13 @@ impl Codec { planner: &mut QueryPlanner, ) -> BufferRef> { match self.ops[..] { - [CodecOp::PushDataSection(1), CodecOp::PushDataSection(2), CodecOp::DictLookup(_)] => { + [CodecOp::PushDataSection(1, false), CodecOp::PushDataSection(2, false), CodecOp::DictLookup(_)] => { let offset_len = planner - .column_section(&self.column_name, 1, None, EncodingType::U64) + .column_section(&self.column_name, 1, false, None, EncodingType::U64) .u64() .unwrap(); let backing_store = planner - .column_section(&self.column_name, 2, None, EncodingType::U8) + .column_section(&self.column_name, 2, false, None, EncodingType::U8) .u8() .unwrap(); planner.inverse_dict_lookup(offset_len, backing_store, string_const) @@ -309,7 +310,8 @@ pub enum CodecOp { Add(EncodingType, i64), Delta(EncodingType), ToI64(EncodingType), - PushDataSection(usize), + // (section_index, is_bitvec) + PushDataSection(usize, bool), DictLookup(EncodingType), LZ4(EncodingType, usize), UnpackStrings, @@ -345,7 +347,7 @@ impl CodecOp { CodecOp::LZ4(t, _) => *t, CodecOp::UnpackStrings => EncodingType::Str, CodecOp::UnhexpackStrings(_, _) => EncodingType::Str, - CodecOp::PushDataSection(i) => section_types[*i], + CodecOp::PushDataSection(i, _) => section_types[*i], CodecOp::Unknown => panic!("Unknown.output_type()"), }; type_stack.push(t); @@ -359,7 +361,7 @@ impl CodecOp { CodecOp::Add(_, x) => *x == 0, CodecOp::Delta(_) => false, CodecOp::ToI64(_) => true, - CodecOp::PushDataSection(_) => true, + CodecOp::PushDataSection(_, _) => true, CodecOp::DictLookup(_) => false, CodecOp::LZ4(_, _) => false, CodecOp::UnpackStrings => false, @@ -374,7 +376,7 @@ impl CodecOp { CodecOp::Add(_, _) => true, CodecOp::Delta(_) => false, CodecOp::ToI64(_) => true, - CodecOp::PushDataSection(_) => true, + CodecOp::PushDataSection(_, _) => true, CodecOp::DictLookup(_) => true, CodecOp::LZ4(_, _) => false, CodecOp::UnpackStrings => false, @@ -390,7 +392,7 @@ impl CodecOp { CodecOp::Add(_, _) => true, CodecOp::Delta(_) => false, CodecOp::ToI64(_) => true, - CodecOp::PushDataSection(_) => true, + CodecOp::PushDataSection(_, _) => true, CodecOp::DictLookup(_) => true, CodecOp::LZ4(_, _) => false, CodecOp::UnpackStrings => false, @@ -405,7 +407,7 @@ impl CodecOp { CodecOp::Add(_, _) => 1, CodecOp::Delta(_) => 1, CodecOp::ToI64(_) => 1, - CodecOp::PushDataSection(_) => 0, + CodecOp::PushDataSection(_, _) => 0, CodecOp::DictLookup(_) => 3, CodecOp::LZ4(_, _) => 1, CodecOp::UnpackStrings => 1, @@ -426,7 +428,7 @@ impl CodecOp { } CodecOp::Delta(t) => format!("Delta({:?})", t), CodecOp::ToI64(t) => format!("ToI64({:?})", t), - CodecOp::PushDataSection(i) => format!("Data({})", i), + CodecOp::PushDataSection(i, is_bitvec) => format!("Data({}{})", i, if *is_bitvec { " bitvec" } else { "" }), CodecOp::DictLookup(t) => format!("Dict({:?})", t), CodecOp::LZ4(t, decoded_len) => { if alternate { @@ -450,9 +452,9 @@ mod tests { fn test_ensure_property() { let codec = vec![ CodecOp::LZ4(EncodingType::U16, 20), - CodecOp::PushDataSection(1), + CodecOp::PushDataSection(1, false), CodecOp::LZ4(EncodingType::U64, 1), - CodecOp::PushDataSection(2), + CodecOp::PushDataSection(2, false), CodecOp::LZ4(EncodingType::U8, 3), CodecOp::DictLookup(EncodingType::U16), ]; @@ -465,9 +467,9 @@ mod tests { assert_eq!( rest, vec![ - CodecOp::PushDataSection(1), + CodecOp::PushDataSection(1, false), CodecOp::LZ4(EncodingType::U64, 1), - CodecOp::PushDataSection(2), + CodecOp::PushDataSection(2, false), CodecOp::LZ4(EncodingType::U8, 3), CodecOp::DictLookup(EncodingType::U16), ] diff --git a/src/mem_store/floats.rs b/src/mem_store/floats.rs index 66056154..8e5243fc 100644 --- a/src/mem_store/floats.rs +++ b/src/mem_store/floats.rs @@ -17,7 +17,7 @@ impl FloatColumn { name, values.len(), None, - vec![CodecOp::PushDataSection(1), CodecOp::Nullable], + vec![CodecOp::PushDataSection(1, true), CodecOp::Nullable], vec![values.into(), present.into()], ), None => Column::new( diff --git a/src/mem_store/integers.rs b/src/mem_store/integers.rs index bdae2cd3..d96bc658 100644 --- a/src/mem_store/integers.rs +++ b/src/mem_store/integers.rs @@ -57,14 +57,14 @@ impl IntegerColumn { name, values.len(), original_range, - vec![CodecOp::Delta(EncodingType::I64), CodecOp::PushDataSection(1), CodecOp::Nullable], + vec![CodecOp::Delta(EncodingType::I64), CodecOp::PushDataSection(1, true), CodecOp::Nullable], vec![values.into(), present.into()]) } else { Column::new( name, values.len(), original_range, - vec![CodecOp::PushDataSection(1), CodecOp::Nullable], + vec![CodecOp::PushDataSection(1, true), CodecOp::Nullable], vec![values.into(), present.into()]) } None => if delta_encode { @@ -101,10 +101,10 @@ impl IntegerColumn { let len = values.len(); let codec = if null_map.is_some() { match (offset == 0, delta_encode) { - (true, true) => vec![CodecOp::Delta(t), CodecOp::PushDataSection(1), CodecOp::Nullable], - (true, false) => vec![CodecOp::PushDataSection(1), CodecOp::Nullable, CodecOp::ToI64(t)], - (false, true) => vec![CodecOp::Add(t, offset), CodecOp::Delta(EncodingType::I64), CodecOp::PushDataSection(1), CodecOp::Nullable], - (false, false) => vec![CodecOp::PushDataSection(1), CodecOp::Nullable, CodecOp::Add(t, offset)], + (true, true) => vec![CodecOp::Delta(t), CodecOp::PushDataSection(1, true), CodecOp::Nullable], + (true, false) => vec![CodecOp::PushDataSection(1, true), CodecOp::Nullable, CodecOp::ToI64(t)], + (false, true) => vec![CodecOp::Add(t, offset), CodecOp::Delta(EncodingType::I64), CodecOp::PushDataSection(1, true), CodecOp::Nullable], + (false, false) => vec![CodecOp::PushDataSection(1, true), CodecOp::Nullable, CodecOp::Add(t, offset)], } } else { match (offset == 0, delta_encode) { diff --git a/src/mem_store/strings.rs b/src/mem_store/strings.rs index d52bf2cd..9a388b5e 100644 --- a/src/mem_store/strings.rs +++ b/src/mem_store/strings.rs @@ -45,7 +45,7 @@ where (string_pack_codec(), DataSection::U8(packed.into_vec())) }; let mut column = if let Some(present) = present { - codec.push(CodecOp::PushDataSection(1)); + codec.push(CodecOp::PushDataSection(1, true)); codec.push(CodecOp::Nullable); Column::new(name, len, None, codec, vec![data, DataSection::U8(present)]) } else { @@ -120,7 +120,7 @@ where ) }; if let Some(present) = present { - codec.insert(0, CodecOp::PushDataSection(3)); + codec.insert(0, CodecOp::PushDataSection(3, true)); codec.insert(1, CodecOp::Nullable); data_sections.push(DataSection::U8(present)); } @@ -131,8 +131,8 @@ where pub fn dict_codec(index_type: EncodingType) -> Vec { vec![ - CodecOp::PushDataSection(1), - CodecOp::PushDataSection(2), + CodecOp::PushDataSection(1, false), + CodecOp::PushDataSection(2, false), CodecOp::DictLookup(index_type), ] } diff --git a/tests/query_tests.rs b/tests/query_tests.rs index 350acdd1..1c792673 100644 --- a/tests/query_tests.rs +++ b/tests/query_tests.rs @@ -1084,6 +1084,32 @@ fn test_column_with_null_partitions() { ); } +#[test] +fn test_long_nullable() { + let _ = env_logger::try_init(); + let locustdb = LocustDB::memory_only(); + let _ = block_on(locustdb.gen_table(locustdb::colgen::GenTable { + name: "test".to_string(), + partitions: 8, + partition_size: 2 << 14, + columns: vec![( + "nullable_int".to_string(), + locustdb::colgen::nullable_ints( + vec![ + None, + Some(1), + Some(-10), + ], + vec![0.9, 0.05, 0.05], + ), + )], + })); + let query = "SELECT nullable_int FROM test LIMIT 0;"; + let expected_rows : Vec<[Value; 1]> = vec![]; + let result = block_on(locustdb.run_query(query, true, true, vec![])).unwrap(); + assert_eq!(result.unwrap().rows.unwrap(), expected_rows); +} + #[test] fn test_group_by_string() { use crate::value_syntax::*;