From 0d0e1ce20db86551ac54758b6464fd9d8a5715f9 Mon Sep 17 00:00:00 2001 From: Clemens Winter Date: Tue, 12 Mar 2024 00:42:26 -0700 Subject: [PATCH] bugfixes --- locustdb-derive/src/reify_types.rs | 8 +- src/engine/data_types/data.rs | 44 +++++++ src/engine/data_types/nullable_vec_data.rs | 13 +- src/engine/data_types/types.rs | 75 +++++++++-- src/engine/data_types/vec_data.rs | 13 ++ src/engine/execution/buffer.rs | 12 ++ src/engine/execution/executor.rs | 4 +- src/engine/execution/query_task.rs | 4 +- src/engine/execution/scratchpad.rs | 4 + src/engine/operators/bool_op.rs | 64 ++++++--- src/engine/operators/comparator.rs | 12 ++ src/engine/operators/filter.rs | 2 + src/engine/operators/filter_nullable.rs | 116 +++++++++++++++++ src/engine/operators/fuse_nulls.rs | 79 +++++++++++ src/engine/operators/mod.rs | 2 + src/engine/operators/null_vec_like.rs | 54 ++++++++ src/engine/operators/type_conversion.rs | 4 +- src/engine/operators/vector_operator.rs | 91 ++++++++++--- src/engine/planning/query_plan.rs | 144 ++++++++++++++------- src/mem_store/column.rs | 3 +- src/mem_store/tree.rs | 3 +- test_data/edge_cases.csv | 22 ++-- tests/query_tests.rs | 117 +++++++++++++++-- 23 files changed, 763 insertions(+), 127 deletions(-) create mode 100644 src/engine/operators/filter_nullable.rs create mode 100644 src/engine/operators/null_vec_like.rs diff --git a/locustdb-derive/src/reify_types.rs b/locustdb-derive/src/reify_types.rs index a9a88be1..4a7f4d4c 100644 --- a/locustdb-derive/src/reify_types.rs +++ b/locustdb-derive/src/reify_types.rs @@ -205,7 +205,7 @@ fn types(t: &Ident) -> Option> { "Float" => Some(vec![Type::F64]), "NullableInteger" => Some(vec![Type::NullableU8, Type::NullableU16, Type::NullableU32, Type::NullableI64]), "NullableFloat" => Some(vec![Type::NullableF64]), - "Primitive" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64, Type::Str, Type::OptStr]), + "Primitive" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64, Type::Str, Type::OptStr, Type::OptF64]), "NullablePrimitive" => Some(vec![Type::NullableU8, Type::NullableU16, Type::NullableU32, Type::NullableI64, Type::NullableF64, Type::NullableStr]), "PrimitiveUSize" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64, Type::Str, Type::USize]), "PrimitiveNoU64" => Some(vec![Type::U8, Type::U16, Type::U32, Type::I64, Type::F64, Type::Str]), @@ -227,7 +227,9 @@ enum Type { I64, F64, Str, - OptStr, + + OptStr, // Option<&str>, used when sorting instead of representation of raw valls + present bit vec + OptF64, // Option>, used when sorting NullableU8, NullableU16, @@ -260,6 +262,7 @@ impl Type { Type::F64 => parse_quote!(EncodingType::F64), Type::Str => parse_quote!(EncodingType::Str), Type::OptStr => parse_quote!(EncodingType::OptStr), + Type::OptF64 => parse_quote!(EncodingType::OptF64), Type::NullableU8 => parse_quote!(EncodingType::NullableU8), Type::NullableU16 => parse_quote!(EncodingType::NullableU16), Type::NullableU32 => parse_quote!(EncodingType::NullableU32), @@ -289,6 +292,7 @@ impl Type { Type::F64 => parse_quote!( let #variable = #variable.buffer.f64(); ), Type::Str => parse_quote!( let #variable = #variable.buffer.str(); ), Type::OptStr => parse_quote!( let #variable = #variable.buffer.opt_str(); ), + Type::OptF64 => parse_quote!( let #variable = #variable.buffer.opt_f64(); ), Type::NullableU8 => parse_quote!( let #variable = #variable.buffer.nullable_u8(); ), Type::NullableU16 => parse_quote!( let #variable = #variable.buffer.nullable_u16(); ), Type::NullableU32 => parse_quote!( let #variable = #variable.buffer.nullable_u32(); ), diff --git a/src/engine/data_types/data.rs b/src/engine/data_types/data.rs index 43fc7490..7ee91511 100644 --- a/src/engine/data_types/data.rs +++ b/src/engine/data_types/data.rs @@ -38,6 +38,9 @@ pub trait Data<'a>: Send + Sync { fn cast_ref_f64(&self) -> &[OrderedFloat] { panic!("{}", self.type_error("cast_ref_f64")) } + fn cast_ref_opt_f64(&self) -> &[Option>] { + panic!("{}", self.type_error("cast_ref_opt_f64")) + } fn cast_ref_u32(&self) -> &[u32] { panic!("{}", self.type_error("cast_ref_u32")) } @@ -109,9 +112,15 @@ pub trait Data<'a>: Send + Sync { fn cast_ref_mut_f64(&mut self) -> &mut Vec> { panic!("{}", self.type_error("cast_ref_mut_f64")) } + fn cast_ref_mut_opt_f64(&mut self) -> &mut Vec>> { + panic!("{}", self.type_error("cast_ref_mut_opt_f64")) + } fn cast_ref_mut_usize(&mut self) -> &mut Vec { panic!("{}", self.type_error("cast_ref_mut_usize")) } + fn cast_ref_mut_null(&mut self) -> &mut usize { + panic!("{}", self.type_error("cast_ref_mut_null")) + } fn cast_ref_mut_mixed(&mut self) -> &mut Vec> { panic!("{}", self.type_error("cast_ref_mut_mixed")) @@ -239,6 +248,9 @@ impl<'a, T: VecData + 'a> Data<'a> for Vec { default fn cast_ref_f64(&self) -> &[OrderedFloat] { panic!("{}", self.type_error("cast_ref_f64")) } + default fn cast_ref_opt_f64(&self) -> &[Option>] { + panic!("{}", self.type_error("cast_ref_opt_f64")) + } default fn cast_ref_usize(&self) -> &[usize] { panic!("{}", self.type_error("cast_ref_usize")) } @@ -278,6 +290,9 @@ impl<'a, T: VecData + 'a> Data<'a> for Vec { default fn cast_ref_mut_f64(&mut self) -> &mut Vec> { panic!("{}", self.type_error("cast_ref_mut_f64")) } + default fn cast_ref_mut_opt_f64(&mut self) -> &mut Vec>> { + panic!("{}", self.type_error("cast_ref_mut_opt_f64")) + } default fn cast_ref_mut_usize(&mut self) -> &mut Vec { panic!("{}", self.type_error("cast_ref_mut_usize")) } @@ -426,6 +441,24 @@ impl<'a> Data<'a> for Vec> { } } +impl<'a> Data<'a> for Vec>> { + fn cast_ref_opt_f64(&self) -> &[Option>] { + self + } + fn cast_ref_mut_opt_f64(&mut self) -> &mut Vec>> { + self + } + fn to_mixed(&self) -> Vec> { + self.iter() + .map(|s| match s { + None => Val::Null, + Some(s) => Val::Float(*s) + }) + .collect() + } +} + + impl<'a> Data<'a> for Vec { fn cast_ref_merge_op(&self) -> &[MergeOp] { self @@ -476,6 +509,13 @@ impl<'a, T: VecData + 'a> Data<'a> for &'a [T] { format!("&{:?}{}", T::t(), display_slice(self, 120)) } + fn make_nullable(&mut self, present: &[u8]) -> BoxedData<'a> { + Box::new(NullableVec { + data: self.to_vec(), + present: present.to_vec(), + }) + } + // Copied from Data and marked default because specialization demands it default fn cast_ref_merge_op(&self) -> &[MergeOp] { panic!("{}", self.type_error("cast_ref_merge_op")) @@ -603,6 +643,10 @@ impl<'a> Data<'a> for usize { } } + fn cast_ref_mut_null(&mut self) -> &mut usize { + self + } + fn to_mixed(&self) -> Vec> { vec![Val::Null; *self] } diff --git a/src/engine/data_types/nullable_vec_data.rs b/src/engine/data_types/nullable_vec_data.rs index 1d5d21c6..03a48cb0 100644 --- a/src/engine/data_types/nullable_vec_data.rs +++ b/src/engine/data_types/nullable_vec_data.rs @@ -31,11 +31,20 @@ impl<'a, T: VecData + 'a> Data<'a> for NullableVec { fn type_error(&self, func_name: &str) -> String { format!("NullableVec<{:?}>.{}", T::t(), func_name) } - fn slice_box<'b>(&'b self, _: usize, _: usize) -> BoxedData<'b> + fn slice_box<'b>(&'b self, from: usize, to: usize) -> BoxedData<'b> where 'a: 'b, { - panic!("nullable slice box!") + // TODO: more efficient implementation that doesn't clone? + let to = min(to, self.len()); + let data = self.data[from..to].to_vec(); + let mut present = vec![0u8; (to - from + 7) / 8]; + for i in from..to { + if self.present.is_set(i) { + present.set(i - from); + } + } + Box::new(NullableVec { data, present }) } default fn append_all(&mut self, other: &dyn Data<'a>, count: usize) -> Option> { diff --git a/src/engine/data_types/types.rs b/src/engine/data_types/types.rs index bb63e2a9..85eeaa30 100644 --- a/src/engine/data_types/types.rs +++ b/src/engine/data_types/types.rs @@ -2,10 +2,10 @@ use serde::{Deserialize, Serialize}; use crate::mem_store::*; +// WARNING: Changing this enum will break backwards compatibility with existing data #[derive(Copy, Clone, Eq, PartialEq, Hash, Debug, Serialize, Deserialize)] pub enum EncodingType { Str, - OptStr, I64, U8, U16, @@ -13,6 +13,9 @@ pub enum EncodingType { U64, F64, + OptStr, + OptF64, + NullableStr, NullableI64, NullableU8, @@ -61,7 +64,9 @@ impl EncodingType { EncodingType::U16 | EncodingType::NullableU16 => EncodingType::NullableU16, EncodingType::U32 | EncodingType::NullableU32 => EncodingType::NullableU32, EncodingType::U64 | EncodingType::NullableU64 => EncodingType::NullableU64, - EncodingType::F64 | EncodingType::NullableF64 => EncodingType::NullableF64, + EncodingType::F64 | EncodingType::NullableF64 | EncodingType::OptF64 => { + EncodingType::NullableF64 + } EncodingType::Val => EncodingType::Val, _ => panic!("{:?} does not have a corresponding nullable type", &self), } @@ -70,6 +75,7 @@ impl EncodingType { pub fn nullable_fused(&self) -> EncodingType { match self { EncodingType::NullableStr => EncodingType::OptStr, + EncodingType::NullableF64 => EncodingType::OptF64, EncodingType::NullableI64 => EncodingType::I64, _ => panic!( "{:?} does not have a corresponding fused nullable type", @@ -79,27 +85,64 @@ impl EncodingType { } pub fn is_nullable(&self) -> bool { - matches!( - self, + match self { EncodingType::NullableStr - | EncodingType::NullableI64 - | EncodingType::NullableU8 - | EncodingType::NullableU16 - | EncodingType::NullableU32 - | EncodingType::NullableU64 - ) + | EncodingType::NullableI64 + | EncodingType::NullableU8 + | EncodingType::NullableU16 + | EncodingType::NullableU32 + | EncodingType::NullableU64 + | EncodingType::NullableF64 + | EncodingType::OptStr + | EncodingType::OptF64 => true, + EncodingType::Str + | EncodingType::I64 + | EncodingType::U8 + | EncodingType::U16 + | EncodingType::U32 + | EncodingType::U64 + | EncodingType::F64 + | EncodingType::USize + | EncodingType::Val + | EncodingType::Null + | EncodingType::ScalarI64 + | EncodingType::ScalarStr + | EncodingType::ScalarString + | EncodingType::ConstVal + | EncodingType::ByteSlices(_) + | EncodingType::ValRows + | EncodingType::Premerge + | EncodingType::MergeOp => false, + } } pub fn non_nullable(&self) -> EncodingType { match self { - EncodingType::NullableStr => EncodingType::Str, + EncodingType::NullableStr | EncodingType::OptStr => EncodingType::Str, EncodingType::NullableI64 => EncodingType::I64, EncodingType::NullableU8 => EncodingType::U8, EncodingType::NullableU16 => EncodingType::U16, EncodingType::NullableU32 => EncodingType::U32, EncodingType::NullableU64 => EncodingType::U64, - EncodingType::OptStr => EncodingType::Str, - _ => *self, + EncodingType::OptF64 | EncodingType::NullableF64 => EncodingType::F64, + EncodingType::Str + | EncodingType::I64 + | EncodingType::U8 + | EncodingType::U16 + | EncodingType::U32 + | EncodingType::U64 + | EncodingType::F64 + | EncodingType::USize + | EncodingType::Val + | EncodingType::Null + | EncodingType::ScalarI64 + | EncodingType::ScalarStr + | EncodingType::ScalarString + | EncodingType::ConstVal + | EncodingType::ByteSlices(_) + | EncodingType::ValRows + | EncodingType::Premerge + | EncodingType::MergeOp => *self, } } @@ -112,6 +155,8 @@ impl EncodingType { (_, EncodingType::Val) => EncodingType::Val, (EncodingType::OptStr, EncodingType::Str) => EncodingType::OptStr, (EncodingType::Str, EncodingType::OptStr) => EncodingType::OptStr, + (EncodingType::OptF64, EncodingType::F64) => EncodingType::OptF64, + (EncodingType::F64, EncodingType::OptF64) => EncodingType::OptF64, _ => unimplemented!("lub not implemented for {:?} and {:?}", self, other), } } @@ -201,6 +246,10 @@ impl Type { Type::new(BasicType::Boolean, None).mutable() } + pub fn integer() -> Type { + Type::new(BasicType::Integer, None) + } + pub fn is_encoded(&self) -> bool { self.codec.as_ref().map_or(false, |c| !c.is_identity()) } diff --git a/src/engine/data_types/vec_data.rs b/src/engine/data_types/vec_data.rs index d5521435..edcd1709 100644 --- a/src/engine/data_types/vec_data.rs +++ b/src/engine/data_types/vec_data.rs @@ -63,6 +63,19 @@ impl VecData> for OrderedFloat { fn t() -> EncodingType { EncodingType::F64 } } +impl VecData>> for Option> { + fn unwrap<'a, 'b>(vec: &'b dyn Data<'a>) -> &'b [Option>] where Option>: 'a { vec.cast_ref_opt_f64() } + fn unwrap_mut<'a, 'b>(vec: &'b mut dyn Data<'a>) -> &'b mut Vec>> where Option>: 'a { vec.cast_ref_mut_opt_f64() } + fn wrap_one(value: Option>) -> RawVal { + match value { + Some(f) => RawVal::Float(f), + None => RawVal::Null, + } + } + + fn t() -> EncodingType { EncodingType::OptF64 } +} + impl VecData for usize { fn unwrap<'a, 'b>(vec: &'b dyn Data<'a>) -> &'b [usize] where usize: 'a { vec.cast_ref_usize() } fn unwrap_mut<'a, 'b>(vec: &'b mut dyn Data<'a>) -> &'b mut Vec where usize: 'a { vec.cast_ref_mut_usize() } diff --git a/src/engine/execution/buffer.rs b/src/engine/execution/buffer.rs index bbdc5ed7..d399c717 100644 --- a/src/engine/execution/buffer.rs +++ b/src/engine/execution/buffer.rs @@ -71,6 +71,7 @@ impl BufferRef { pub fn string(self) -> BufferRef { self.transmute() } pub fn str<'a>(self) -> BufferRef<&'a str> { self.transmute() } pub fn opt_str<'a>(self) -> BufferRef> { self.transmute() } + pub fn opt_f64(self) -> BufferRef>> { self.transmute() } pub fn usize(self) -> BufferRef { self.transmute() } fn transmute(self) -> BufferRef { unsafe { mem::transmute(self) } } } @@ -172,6 +173,12 @@ impl From> for TypedBufferRef { } } +impl From>> for TypedBufferRef { + fn from(buffer: BufferRef>) -> TypedBufferRef { + TypedBufferRef::new(buffer.any(), EncodingType::NullableU8) + } +} + impl BufferRef> { pub fn cast_non_nullable(self) -> BufferRef { unsafe { mem::transmute(self) } } pub fn nullable_any(self) -> BufferRef> { unsafe { mem::transmute(self) } } @@ -232,6 +239,11 @@ impl TypedBufferRef { Ok(self.buffer.opt_str()) } + pub fn opt_f64(&self) -> Result>>, QueryError> { + ensure!(self.tag == EncodingType::OptF64, "{:?} != OptF64", self.tag); + Ok(self.buffer.opt_f64()) + } + pub fn i64(&self) -> Result, QueryError> { ensure!(self.tag == EncodingType::I64, "{:?} != I64", self.tag); Ok(self.buffer.i64()) diff --git a/src/engine/execution/executor.rs b/src/engine/execution/executor.rs index dd05932c..e421369f 100644 --- a/src/engine/execution/executor.rs +++ b/src/engine/execution/executor.rs @@ -525,9 +525,11 @@ impl<'a> QueryExecutor<'a> { while has_more { has_more = false; for &(op, streamable) in &self.stages[stage].ops { - self.ops[op].execute(stream && streamable, scratchpad)?; if show && iters == 0 { println!("{}", self.ops[op].display(true)); + } + self.ops[op].execute(stream && streamable, scratchpad)?; + if show && iters == 0 { for output in self.ops[op].outputs() { let data = scratchpad.get_any(output); println!("{}", data.display()); diff --git a/src/engine/execution/query_task.rs b/src/engine/execution/query_task.rs index 9c2f1844..60dc83c4 100644 --- a/src/engine/execution/query_task.rs +++ b/src/engine/execution/query_task.rs @@ -341,6 +341,7 @@ impl QueryTask { let limit = lo.limit as usize; let offset = lo.offset as usize; let count = cmp::min(limit, full_result.len() - offset); + full_result.validate().unwrap(); let mut rows = None; if self.rowformat { @@ -450,7 +451,8 @@ impl BasicTypeColumn { | EncodingType::NullableU32 | EncodingType::NullableU64 | EncodingType::NullableF64 - | EncodingType::OptStr => { + | EncodingType::OptStr + | EncodingType::OptF64 => { let mut vals = vec![]; for i in 0..data.len() { vals.push(data.get_raw(i)); diff --git a/src/engine/execution/scratchpad.rs b/src/engine/execution/scratchpad.rs index 03b05b88..b3a0b9ba 100644 --- a/src/engine/execution/scratchpad.rs +++ b/src/engine/execution/scratchpad.rs @@ -295,6 +295,10 @@ impl<'a> Scratchpad<'a> { self.null_maps[alias.i] = self.null_maps[original.i]; } + pub fn is_alias(&self, i: BufferRef, j: BufferRef) -> bool { + self.resolve(&i) == self.resolve(&j) + } + pub fn assemble_nullable( &mut self, original: BufferRef, diff --git a/src/engine/operators/bool_op.rs b/src/engine/operators/bool_op.rs index 40582887..6f4f306c 100644 --- a/src/engine/operators/bool_op.rs +++ b/src/engine/operators/bool_op.rs @@ -11,7 +11,11 @@ pub struct BooleanOperator { } impl<'a, T: BooleanOp + fmt::Debug + 'a> BooleanOperator { - pub fn compare(lhs: BufferRef, rhs: BufferRef, output: BufferRef) -> BoxedOperator<'a> { + pub fn compare( + lhs: BufferRef, + rhs: BufferRef, + output: BufferRef, + ) -> BoxedOperator<'a> { Box::new(BooleanOperator:: { lhs, rhs, @@ -23,9 +27,14 @@ impl<'a, T: BooleanOp + fmt::Debug + 'a> BooleanOperator { impl<'a, T: BooleanOp + fmt::Debug> VecOperator<'a> for BooleanOperator { fn execute(&mut self, _: bool, scratchpad: &mut Scratchpad<'a>) -> Result<(), QueryError> { - let mut result = scratchpad.get_mut(self.lhs); - let rhs = scratchpad.get(self.rhs); - T::evaluate(&mut result, &rhs); + if scratchpad.is_alias(self.lhs, self.rhs) { + let mut result = scratchpad.get_mut(self.lhs); + T::evaluate_aliased(&mut result); + } else { + let mut result = scratchpad.get_mut(self.lhs); + let rhs = scratchpad.get(self.rhs); + T::evaluate(&mut result, &rhs); + } Ok(()) } @@ -33,12 +42,24 @@ impl<'a, T: BooleanOp + fmt::Debug> VecOperator<'a> for BooleanOperator { scratchpad.alias(self.lhs, self.output); } - fn inputs(&self) -> Vec> { vec![self.lhs.any(), self.rhs.any()] } - fn outputs(&self) -> Vec> { vec![self.output.any()] } - fn can_stream_input(&self, _: usize) -> bool { true } - fn can_stream_output(&self, _: usize) -> bool { true } - fn mutates(&self, i: usize) -> bool { self.lhs.i == i } - fn allocates(&self) -> bool { false } + fn inputs(&self) -> Vec> { + vec![self.lhs.any(), self.rhs.any()] + } + fn outputs(&self) -> Vec> { + vec![self.output.any()] + } + fn can_stream_input(&self, _: usize) -> bool { + true + } + fn can_stream_output(&self, _: usize) -> bool { + true + } + fn mutates(&self, i: usize) -> bool { + self.lhs.i == i + } + fn allocates(&self) -> bool { + false + } fn display_op(&self, _: bool) -> String { format!("{} {} {}", self.lhs, T::symbol(), self.rhs) @@ -47,6 +68,8 @@ impl<'a, T: BooleanOp + fmt::Debug> VecOperator<'a> for BooleanOperator { pub trait BooleanOp { fn evaluate(lhs: &mut [u8], rhs: &[u8]); + // Specialized case for when lhs refers to the same buffer as rhs + fn evaluate_aliased(lhs: &mut [u8]); fn name() -> &'static str; fn symbol() -> &'static str; } @@ -61,8 +84,14 @@ impl BooleanOp for BooleanOr { } } - fn name() -> &'static str { "bit_vec_or" } - fn symbol() -> &'static str { "|" } + fn evaluate_aliased(_lhs: &mut [u8]) {} + + fn name() -> &'static str { + "bit_vec_or" + } + fn symbol() -> &'static str { + "|" + } } #[derive(Debug)] @@ -75,7 +104,12 @@ impl BooleanOp for BooleanAnd { } } - fn name() -> &'static str { "bit_vec_and" } - fn symbol() -> &'static str { "&" } -} + fn evaluate_aliased(_lhs: &mut [u8]) {} + fn name() -> &'static str { + "bit_vec_and" + } + fn symbol() -> &'static str { + "&" + } +} diff --git a/src/engine/operators/comparator.rs b/src/engine/operators/comparator.rs index 9929244a..3c72ad1b 100644 --- a/src/engine/operators/comparator.rs +++ b/src/engine/operators/comparator.rs @@ -47,6 +47,12 @@ impl Comparator> for CmpLessThan { fn is_less_than() -> bool { true } } +impl Comparator> > for CmpLessThan { + fn cmp(left: Option>, right: Option>) -> bool { left < right } + fn cmp_eq(left: Option>, right: Option>) -> bool { left <= right } + fn is_less_than() -> bool { true } +} + impl<'a> Comparator<&'a str> for CmpLessThan { fn cmp(left: &str, right: &str) -> bool { left < right } fn cmp_eq(left: &str, right: &str) -> bool { left <= right } @@ -99,6 +105,12 @@ impl Comparator> for CmpGreaterThan { fn is_less_than() -> bool { false } } +impl Comparator> > for CmpGreaterThan { + fn cmp(left: Option>, right: Option>) -> bool { left > right } + fn cmp_eq(left: Option>, right: Option>) -> bool { left >= right } + fn is_less_than() -> bool { false } +} + impl<'a> Comparator<&'a str> for CmpGreaterThan { fn cmp(left: &str, right: &str) -> bool { left > right } fn cmp_eq(left: &str, right: &str) -> bool { left >= right } diff --git a/src/engine/operators/filter.rs b/src/engine/operators/filter.rs index 9bf47d32..c8530d82 100644 --- a/src/engine/operators/filter.rs +++ b/src/engine/operators/filter.rs @@ -1,6 +1,7 @@ use crate::bitvec::BitVec; use crate::engine::*; +/// Selects all elements in `input` where the corresponding element in `filter` is non-zero. pub struct Filter { pub input: BufferRef, pub filter: BufferRef, @@ -51,6 +52,7 @@ where } } +/// Selects all elements in `input` where the corresponding element in `filter` is non-zero and non-null. pub struct NullableFilter { pub input: BufferRef, pub filter: BufferRef>, diff --git a/src/engine/operators/filter_nullable.rs b/src/engine/operators/filter_nullable.rs new file mode 100644 index 00000000..601b4309 --- /dev/null +++ b/src/engine/operators/filter_nullable.rs @@ -0,0 +1,116 @@ +use crate::bitvec::*; +use crate::engine::*; + +/// Selects all elements in nullable `input` where the corresponding element in `filter` is non-zero. +pub struct FilterNullable { + pub input: BufferRef>, + pub filter: BufferRef, + pub output: BufferRef>, +} + +impl<'a, T: 'a> VecOperator<'a> for FilterNullable +where + T: VecData, +{ + fn execute(&mut self, stream: bool, scratchpad: &mut Scratchpad<'a>) -> Result<(), QueryError> { + let (data, present) = scratchpad.get_nullable(self.input); + let filter = scratchpad.get(self.filter); + let (mut filtered, mut filtered_present) = scratchpad.get_mut_nullable(self.output); + if stream { + filtered.clear(); + for p in filtered_present.iter_mut() { + *p = 0; + } + } + for (i, (d, &select)) in data.iter().zip(filter.iter()).enumerate() { + if select > 0 { + if BitVec::is_set(&&*present, i) { + filtered_present.set(filtered.len()); + } + filtered.push(*d); + } + } + Ok(()) + } + + fn init(&mut self, _: usize, batch_size: usize, scratchpad: &mut Scratchpad<'a>) { + scratchpad.set_nullable(self.output, Vec::with_capacity(batch_size), Vec::with_capacity((batch_size + 7) / 8)); + } + + fn inputs(&self) -> Vec> { + vec![self.input.any(), self.filter.any()] + } + fn outputs(&self) -> Vec> { + vec![self.output.any()] + } + fn can_stream_input(&self, _: usize) -> bool { + true + } + fn can_stream_output(&self, _: usize) -> bool { + true + } + fn allocates(&self) -> bool { + true + } + + fn display_op(&self, _: bool) -> String { + format!("{}[{}]", self.input, self.filter) + } +} + +/// Selects all elements in nullable `input` where the corresponding element in `filter` is non-zero and non-null. +pub struct NullableFilterNullable { + pub input: BufferRef>, + pub filter: BufferRef>, + pub output: BufferRef>, +} + +impl<'a, T: 'a> VecOperator<'a> for NullableFilterNullable +where + T: VecData, +{ + fn execute(&mut self, stream: bool, scratchpad: &mut Scratchpad<'a>) -> Result<(), QueryError> { + let (data, input_present) = scratchpad.get_nullable(self.input); + let (filter, filter_present) = scratchpad.get_nullable(self.filter); + let (mut filtered, mut filtered_present) = scratchpad.get_mut_nullable(self.output); + if stream { + filtered.clear(); + for p in filtered_present.iter_mut() { + *p = 0; + } + } + for i in 0..data.len() { + if filter[i] > 0 && (&*filter_present).is_set(i) { + if BitVec::is_set(&&*input_present, i) { + filtered_present.set(filtered.len()); + } + filtered.push(data[i]); + } + } + Ok(()) + } + + fn init(&mut self, _: usize, batch_size: usize, scratchpad: &mut Scratchpad<'a>) { + scratchpad.set_nullable(self.output, Vec::with_capacity(batch_size), Vec::with_capacity((batch_size + 7) / 8)); + } + + fn inputs(&self) -> Vec> { + vec![self.input.any(), self.filter.any()] + } + fn outputs(&self) -> Vec> { + vec![self.output.any()] + } + fn can_stream_input(&self, _: usize) -> bool { + true + } + fn can_stream_output(&self, _: usize) -> bool { + true + } + fn allocates(&self) -> bool { + true + } + + fn display_op(&self, _: bool) -> String { + format!("{}[{}]", self.input, self.filter) + } +} diff --git a/src/engine/operators/fuse_nulls.rs b/src/engine/operators/fuse_nulls.rs index 530ef7a6..9ca99026 100644 --- a/src/engine/operators/fuse_nulls.rs +++ b/src/engine/operators/fuse_nulls.rs @@ -1,3 +1,5 @@ +use ordered_float::OrderedFloat; + use crate::bitvec::*; use crate::engine::*; use std::i64; @@ -236,3 +238,80 @@ impl<'a, T: GenericIntVec> VecOperator<'a> for UnfuseIntNulls { } } + +pub struct FuseNullsF64 { + pub input: BufferRef>>, + pub fused: BufferRef>>, +} + +impl<'a> VecOperator<'a> for FuseNullsF64 { + fn execute(&mut self, stream: bool, scratchpad: &mut Scratchpad<'a>) -> Result<(), QueryError> { + let (input, present) = scratchpad.get_nullable(self.input); + let mut fused = scratchpad.get_mut(self.fused); + if stream { fused.clear(); } + for i in 0..input.len() { + if (&*present).is_set(i) { + fused.push(Some(input[i])); + } else { + fused.push(None); + } + } + Ok(()) + } + + fn init(&mut self, _: usize, batch_size: usize, scratchpad: &mut Scratchpad<'a>) { + scratchpad.set(self.fused, Vec::with_capacity(batch_size)); + } + + fn inputs(&self) -> Vec> { vec![self.input.any()] } + fn outputs(&self) -> Vec> { vec![self.fused.any()] } + fn can_stream_input(&self, _: usize) -> bool { true } + fn can_stream_output(&self, _: usize) -> bool { true } + fn allocates(&self) -> bool { true } + + fn display_op(&self, _: bool) -> String { + format!("FuseNullsF64({})", self.fused) + } +} + +pub struct UnfuseNullsF64 { + pub fused: BufferRef>>, + pub data: BufferRef>, + pub present: BufferRef, + pub unfused: BufferRef>>, +} + +impl<'a> VecOperator<'a> for UnfuseNullsF64 { + fn execute(&mut self, stream: bool, scratchpad: &mut Scratchpad<'a>) -> Result<(), QueryError> { + let present = { + let fused = scratchpad.get(self.fused); + let mut data = scratchpad.get_mut(self.data); + let mut present = vec![0; fused.len() / 8 + 1]; + if stream { data.clear() } + for i in 0..fused.len() { + data.push(fused[i].unwrap_or(OrderedFloat(0.0))); + if fused[i].is_some() { + present.set(i); + } + } + present + }; + scratchpad.set(self.present, present); + Ok(()) + } + + fn init(&mut self, _: usize, batch_size: usize, scratchpad: &mut Scratchpad<'a>) { + scratchpad.assemble_nullable(self.data, self.present, self.unfused); + scratchpad.set(self.data, Vec::with_capacity(batch_size)); + } + + fn inputs(&self) -> Vec> { vec![self.fused.any()] } + fn outputs(&self) -> Vec> { vec![self.unfused.any()] } + fn can_stream_input(&self, _: usize) -> bool { true } + fn can_stream_output(&self, _: usize) -> bool { true } + fn allocates(&self) -> bool { true } + + fn display_op(&self, _: bool) -> String { + format!("UnfuseNullsF64({})", self.fused) + } +} diff --git a/src/engine/operators/mod.rs b/src/engine/operators/mod.rs index d332fa95..887493ea 100644 --- a/src/engine/operators/mod.rs +++ b/src/engine/operators/mod.rs @@ -22,6 +22,7 @@ mod dict_lookup; mod encode_const; mod exists; mod filter; +mod filter_nullable; mod functions; mod fuse_nulls; mod get_null_map; @@ -42,6 +43,7 @@ mod merge_partitioned; mod nonzero_compact; mod nonzero_indices; mod null_vec; +mod null_vec_like; mod numeric_operators; mod parameterized_vec_vec_int_op; mod propagate_nullability; diff --git a/src/engine/operators/null_vec_like.rs b/src/engine/operators/null_vec_like.rs new file mode 100644 index 00000000..039e766a --- /dev/null +++ b/src/engine/operators/null_vec_like.rs @@ -0,0 +1,54 @@ +use crate::engine::*; +use crate::bitvec::BitVec; + +#[derive(Debug)] +pub struct NullVecLike { + pub input: BufferRef, + pub output: BufferRef, + // 0: use input length, 1: non-zero elements in u8 input, 2: non-zero non-null elements in nullalb u8 input + pub source_type: u8, +} + +impl<'a> VecOperator<'a> for NullVecLike { + fn execute(&mut self, _: bool, scratchpad: &mut Scratchpad<'a>) -> Result<(), QueryError> { + let count = match self.source_type { + 0 => scratchpad.get_any(self.input).len(), + 1 => scratchpad.get(self.input.u8()).iter().filter(|&&x| x != 0).count(), + 2 => { + let mut count = 0; + let (data, present) = scratchpad.get_nullable(self.input.nullable_u8()); + for (i, d) in data.iter().enumerate() { + if *d != 0 && BitVec::is_set(&&*present, i) { + count += 1; + } + } + count + }, + _ => unreachable!(), + }; + let mut output = scratchpad.get_any_mut(self.output); + *output.cast_ref_mut_null() = count; + Ok(()) + } + + fn init(&mut self, _: usize, _: usize, _: &mut Scratchpad<'a>) { } + + fn inputs(&self) -> Vec> { + vec![] + } + fn outputs(&self) -> Vec> { + vec![self.output.any()] + } + fn can_stream_input(&self, _: usize) -> bool { + false + } + fn can_stream_output(&self, _: usize) -> bool { + true + } + fn allocates(&self) -> bool { + false + } + fn display_op(&self, _: bool) -> String { + format!("NullVecLike({})", self.input) + } +} diff --git a/src/engine/operators/type_conversion.rs b/src/engine/operators/type_conversion.rs index 073fb326..f847ac03 100644 --- a/src/engine/operators/type_conversion.rs +++ b/src/engine/operators/type_conversion.rs @@ -157,4 +157,6 @@ impl<'a> Cast<&'a str> for Val<'a> { } } -impl<'a> Cast> for &'a str { fn cast(self) -> Option<&'a str> { Some(self) } } \ No newline at end of file +impl<'a> Cast> for &'a str { fn cast(self) -> Option<&'a str> { Some(self) } } + +impl Cast>> for OrderedFloat { fn cast(self) -> Option> { Some(self) } } \ No newline at end of file diff --git a/src/engine/operators/vector_operator.rs b/src/engine/operators/vector_operator.rs index ab17895b..80ead58d 100644 --- a/src/engine/operators/vector_operator.rs +++ b/src/engine/operators/vector_operator.rs @@ -1,7 +1,7 @@ use itertools::Itertools; use locustdb_derive::reify_types; -use regex::Regex; use ordered_float::OrderedFloat; +use regex::Regex; use crate::engine::Aggregator; use crate::engine::*; @@ -30,6 +30,7 @@ use super::dict_lookup::*; use super::encode_const::*; use super::exists::Exists; use super::filter::{Filter, NullableFilter}; +use super::filter_nullable::{FilterNullable, NullableFilterNullable}; use super::functions::*; use super::fuse_nulls::*; use super::get_null_map::GetNullMap; @@ -51,6 +52,7 @@ use super::merge_partitioned::MergePartitioned; use super::nonzero_compact::NonzeroCompact; use super::nonzero_indices::NonzeroIndices; use super::null_vec::NullVec; +use super::null_vec_like::NullVecLike; use super::numeric_operators::*; use super::parameterized_vec_vec_int_op::*; use super::partition::Partition; @@ -164,6 +166,11 @@ pub mod operator { present, nullable_data: nullable_data.nullable_str()?, })), + EncodingType::F64 => Ok(Box::new(AssembleNullable { + data: data.f64()?, + present, + nullable_data: nullable_data.nullable_f64()?, + })), _ => Err(fatal!("nullable not implemented for type {:?}", data.tag)), } } @@ -250,16 +257,23 @@ pub mod operator { input: TypedBufferRef, fused: TypedBufferRef, ) -> Result, QueryError> { - if input.tag == EncodingType::NullableI64 { - Ok(Box::new(FuseNullsI64 { + match input.tag { + EncodingType::NullableI64 => Ok(Box::new(FuseNullsI64 { input: input.nullable_i64()?, fused: fused.i64()?, - })) - } else { - Ok(Box::new(FuseNullsStr { + })), + EncodingType::NullableStr => Ok(Box::new(FuseNullsStr { input: input.nullable_str()?, fused: fused.opt_str()?, - })) + })), + EncodingType::NullableF64 => Ok(Box::new(FuseNullsF64 { + input: input.nullable_f64()?, + fused: fused.opt_f64()?, + })), + _ => Err(fatal!( + "fuse_nulls not implemented for type {:?}", + input.tag + )), } } @@ -345,19 +359,28 @@ pub mod operator { present: BufferRef, unfused: TypedBufferRef, ) -> Result, QueryError> { - if fused.tag == EncodingType::I64 { - Ok(Box::new(UnfuseNullsI64 { + match fused.tag { + EncodingType::I64 => Ok(Box::new(UnfuseNullsI64 { fused: fused.i64()?, present, unfused: unfused.nullable_i64()?, - })) - } else { - Ok(Box::new(UnfuseNullsStr { + })), + EncodingType::OptStr => Ok(Box::new(UnfuseNullsStr { fused: fused.opt_str()?, data: data.str()?, present, unfused: unfused.nullable_str()?, - })) + })), + EncodingType::OptF64 => Ok(Box::new(UnfuseNullsF64 { + fused: fused.opt_f64()?, + data: data.f64()?, + present, + unfused: unfused.nullable_f64()?, + })), + _ => Err(fatal!( + "unfuse_nulls not implemented for type {:?}", + fused.tag + )), } } @@ -492,10 +515,18 @@ pub mod operator { filter: BufferRef, output: TypedBufferRef, ) -> Result, QueryError> { - reify_types! { - "filter"; - input, output: PrimitiveUSize; - Ok(Box::new(Filter { input, filter, output })) + if input.is_nullable() { + reify_types! { + "filter_nullable"; + input, output: NullablePrimitive; + Ok(Box::new(FilterNullable { input, filter, output })) + } + } else { + reify_types! { + "filter"; + input, output: PrimitiveUSize; + Ok(Box::new(Filter { input, filter, output })) + } } } @@ -504,10 +535,18 @@ pub mod operator { filter: BufferRef>, output: TypedBufferRef, ) -> Result, QueryError> { - reify_types! { - "nullable_filter"; - input, output: PrimitiveUSize; - Ok(Box::new(NullableFilter { input, filter, output })) + if input.is_nullable() { + reify_types! { + "nullable_filter_nullable"; + input, output: NullablePrimitive; + Ok(Box::new(NullableFilterNullable { input, filter, output })) + } + } else { + reify_types! { + "nullable_filter"; + input, output: PrimitiveUSize; + Ok(Box::new(NullableFilter { input, filter, output })) + } } } @@ -565,6 +604,10 @@ pub mod operator { Box::new(NullVec { len, output }) } + pub fn null_vec_like<'a>(input: BufferRef, output: BufferRef, source_type: u8) -> BoxedOperator<'a> { + Box::new(NullVecLike { input, output, source_type }) + } + pub fn constant_expand<'a>( val: i64, len: usize, @@ -1104,6 +1147,12 @@ pub mod operator { output: output.opt_str()?, })); } + if input.tag == EncodingType::F64 && output.tag == EncodingType::OptF64 { + return Ok(Box::new(TypeConversionOperator { + input: input.f64()?, + output: output.opt_f64()?, + })); + } reify_types! { "type_conversion"; input: Integer, output: Integer; diff --git a/src/engine/planning/query_plan.rs b/src/engine/planning/query_plan.rs index b50296ff..ce73ad7f 100644 --- a/src/engine/planning/query_plan.rs +++ b/src/engine/planning/query_plan.rs @@ -66,8 +66,8 @@ pub enum QueryPlan { #[output(t = "base=data;null=_always")] nullable: TypedBufferRef, }, - /// Converts NullableI64 or NullableStr into a representation where nulls are encoded as part - /// of the data (i64 with i64::MIN representing null for NullableI64, and Option<&str> for NullableStr). + /// Converts NullableI64 or NullableStr NullableF64 into a representation where nulls are encoded as part + /// of the data (i64 with i64::MIN representing null for NullableI64, Option<&str> for NullableStr, and Option for NullableF64). FuseNulls { nullable: TypedBufferRef, #[output(t = "base=nullable;null=_fused")] @@ -478,6 +478,13 @@ pub enum QueryPlan { #[output(t = "base=provided")] nulls: TypedBufferRef, }, + NullVecLike { + plan: TypedBufferRef, + // 0: use input length, 1: non-zero elements in u8 input, 2: non-zero non-null elements in nullalb u8 input + source_type: u8, + #[output(t = "base=provided")] + nulls: TypedBufferRef, + }, ScalarI64 { value: i64, hide_value: bool, @@ -786,18 +793,22 @@ fn function2_registry() -> HashMap> { ( Func2Type::Multiply, vec![ - Function2::integer_op(Box::new(|qp, lhs, rhs| { - qp.checked_multiply(lhs, rhs) - })), - Function2::float_op(Box::new(|qp, lhs, rhs| { - qp.multiply(lhs, rhs, EncodingType::F64) - }), BasicType::Integer, BasicType::Float), - Function2::float_op(Box::new(|qp, lhs, rhs| { - qp.multiply(lhs, rhs, EncodingType::F64) - }), BasicType::Float, BasicType::Integer), - Function2::float_op(Box::new(|qp, lhs, rhs| { - qp.multiply(lhs, rhs, EncodingType::F64) - }), BasicType::Float, BasicType::Float), + Function2::integer_op(Box::new(|qp, lhs, rhs| qp.checked_multiply(lhs, rhs))), + Function2::float_op( + Box::new(|qp, lhs, rhs| qp.multiply(lhs, rhs, EncodingType::F64)), + BasicType::Integer, + BasicType::Float, + ), + Function2::float_op( + Box::new(|qp, lhs, rhs| qp.multiply(lhs, rhs, EncodingType::F64)), + BasicType::Float, + BasicType::Integer, + ), + Function2::float_op( + Box::new(|qp, lhs, rhs| qp.multiply(lhs, rhs, EncodingType::F64)), + BasicType::Float, + BasicType::Float, + ), ], ), ( @@ -924,10 +935,21 @@ impl QueryPlan { }; (plan, t) } - None => ( - planner.null_vec(column_len, EncodingType::Null), - Type::new(BasicType::Null, None), - ), + None => { + let plan = match filter { + Filter::None => planner.null_vec(column_len, EncodingType::Null), + Filter::U8(filter) => { + planner.null_vec_like(filter.into(), 1, EncodingType::Null) + } + Filter::NullableU8(filter) => { + planner.null_vec_like(filter.into(), 2, EncodingType::Null) + } + Filter::Indices(filter) => { + planner.null_vec_like(filter.into(), 0, EncodingType::Null) + } + }; + (plan, Type::new(BasicType::Null, None)) + } }, Func2(Or, ref lhs, ref rhs) => { let (plan_lhs, type_lhs) = @@ -936,11 +958,25 @@ impl QueryPlan { QueryPlan::compile_expr(rhs, filter, columns, column_len, planner)?; if type_lhs.decoded != BasicType::Boolean || type_rhs.decoded != BasicType::Boolean { + log::info!( + "Found {:?} -> ({:?}: {:?}) OR {:?} -> ({:?}: {:?}), expected bool OR bool", + lhs, + plan_lhs, + type_lhs, + rhs, + plan_rhs, + type_rhs, + ); bail!( QueryError::TypeError, - "Found {} OR {}, expected bool OR bool" + "Found ({:?}: {:?}) OR ({:?}: {:?}), expected bool OR bool", + plan_lhs, + type_lhs, + plan_rhs, + type_rhs, ) } + log::info!("{:?} OR {:?}", plan_lhs, plan_rhs); (planner.or(plan_lhs, plan_rhs), Type::bit_vec()) } Func2(And, ref lhs, ref rhs) => { @@ -952,7 +988,9 @@ impl QueryPlan { { bail!( QueryError::TypeError, - "Found {} AND {}, expected bool AND bool" + "Found {:?} AND {:?}, expected bool AND bool", + type_lhs.decoded, + type_rhs.decoded, ) } (planner.and(plan_lhs, plan_rhs), Type::bit_vec()) @@ -1127,7 +1165,7 @@ impl QueryPlan { Func1(ftype, ref inner) => { let (plan, t) = QueryPlan::compile_expr(inner, filter, columns, column_len, planner)?; - let plan = match ftype { + match ftype { Func1Type::ToYear => { let decoded = match t.codec.clone() { Some(codec) => codec.decode(plan, planner), @@ -1140,7 +1178,7 @@ impl QueryPlan { &t ) } - planner.to_year(decoded) + (planner.to_year(decoded), Type::integer()) } Func1Type::Length => { let decoded = match t.codec.clone() { @@ -1154,7 +1192,7 @@ impl QueryPlan { &t ) } - planner.length(decoded.str()?).into() + (planner.length(decoded.str()?).into(), Type::integer()) } Func1Type::Not => { let decoded = match t.codec.clone() { @@ -1168,27 +1206,39 @@ impl QueryPlan { &t ) } - planner.not(decoded.u8()?).into() + (planner.not(decoded.u8()?).into(), Type::bit_vec()) } Func1Type::IsNull => { if plan.is_nullable() { - planner.is_null(plan.nullable_any()?).into() + ( + planner.is_null(plan.nullable_any()?).into(), + Type::bit_vec(), + ) } else { - planner.constant_expand( - (false as u8) as i64, - column_len, - EncodingType::U8, + ( + planner.constant_expand( + (false as u8) as i64, + column_len, + EncodingType::U8, + ), + Type::bit_vec(), ) } } Func1Type::IsNotNull => { if plan.is_nullable() { - planner.is_not_null(plan.nullable_any()?).into() + ( + planner.is_not_null(plan.nullable_any()?).into(), + Type::bit_vec(), + ) } else { - planner.constant_expand( - (true as u8) as i64, - column_len, - EncodingType::U8, + ( + planner.constant_expand( + (true as u8) as i64, + column_len, + EncodingType::U8, + ), + Type::bit_vec(), ) } } @@ -1198,8 +1248,7 @@ impl QueryPlan { "Unary minus not implemented for arbitrary expressions." ) } - }; - (plan, t.decoded()) + } } Const(RawVal::Int(i)) => ( planner.scalar_i64(i, false).into(), @@ -1632,6 +1681,11 @@ pub(super) fn prepare<'a>( scalar_str, } => operator::scalar_str(value, pinned_string, scalar_str), QueryPlan::NullVec { len, nulls } => operator::null_vec(len, nulls.any()), + QueryPlan::NullVecLike { + plan, + source_type, + nulls, + } => operator::null_vec_like(plan.any(), nulls.any(), source_type), QueryPlan::ConstantExpand { value, len, @@ -1710,11 +1764,13 @@ pub(super) fn prepare<'a>( max_index, aggregator, aggregate, - } => if aggregate.tag == EncodingType::F64 { - operator::aggregate_f64(plan, grouping_key, max_index, aggregator, aggregate)? - } else { - operator::aggregate(plan, grouping_key, max_index, aggregator, aggregate)? - }, + } => { + if aggregate.tag == EncodingType::F64 { + operator::aggregate_f64(plan, grouping_key, max_index, aggregator, aggregate)? + } else { + operator::aggregate(plan, grouping_key, max_index, aggregator, aggregate)? + } + } QueryPlan::CheckedAggregate { plan, grouping_key, @@ -1817,9 +1873,7 @@ pub(super) fn prepare<'a>( present, difference, } => operator::nullable_checked_subtraction(lhs, rhs, present, difference)?, - QueryPlan::Multiply { lhs, rhs, product } => { - operator::multiplication(lhs, rhs, product)? - } + QueryPlan::Multiply { lhs, rhs, product } => operator::multiplication(lhs, rhs, product)?, QueryPlan::CheckedMultiply { lhs, rhs, product } => { operator::checked_multiplication(lhs, rhs, product.i64()?)? } @@ -1948,4 +2002,4 @@ pub(super) fn prepare<'a>( }; result.push(operation); Ok(result.last_buffer()) -} \ No newline at end of file +} diff --git a/src/mem_store/column.rs b/src/mem_store/column.rs index b03c896e..cd9971ad 100644 --- a/src/mem_store/column.rs +++ b/src/mem_store/column.rs @@ -163,12 +163,13 @@ impl Column { codec_tree.codec = signature; codec_tree.size_bytes += size_bytes; codec_tree.rows += self.len; - if depth > 2 && self.data.len() > 1 { + if depth > 2 { for (i, d) in self.data.iter().enumerate() { if codec_tree.sections.len() == i { codec_tree.sections.push(MemTreeSection { id: i, size_bytes: 0, + datatype: format!("{:?}", d.encoding_type()), }); } codec_tree.sections[i].size_bytes += d.heap_size_of_children(); diff --git a/src/mem_store/tree.rs b/src/mem_store/tree.rs index 8835fe88..729881cc 100644 --- a/src/mem_store/tree.rs +++ b/src/mem_store/tree.rs @@ -36,6 +36,7 @@ pub struct MemTreeEncoding { pub struct MemTreeSection { pub id: usize, pub size_bytes: usize, + pub datatype: String, } impl MemTreeTable { @@ -151,7 +152,7 @@ impl fmt::Display for MemTreeEncoding { impl fmt::Display for MemTreeSection { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, ".{} {:30} {:>8}", self.id, "", format!("{:.2}", bite(self.size_bytes))) + write!(f, ".{} {:30} {:>8}", self.id, self.datatype, format!("{:.2}", bite(self.size_bytes))) } } diff --git a/test_data/edge_cases.csv b/test_data/edge_cases.csv index 2d86d76b..a52843dd 100644 --- a/test_data/edge_cases.csv +++ b/test_data/edge_cases.csv @@ -1,11 +1,11 @@ -u8_offset_encoded,non_dense_ints,enum,string_packed,constant0,constant0_2,negative,id,nullable_int,nullable_int2,country,largenum,float -256,0,aa,xyz,0,0,-199,0,-1,,Germany,-9223372036854775807,0.123412 -258,2,aa,abc,0,0,39,1,-40,-40,USA,9223372036854775807,3e-4 -259,3,aa,axz,0,0,-100,2,,,France,9223372036854775807,-124.0 -257,1,bb,AXY,0,0,34,3,,0,,9223372036854775807,3.15159 -275,4,bb,azy,0,0,4031,4,10,9,France,-9223372036854775807,0.1234e30 -500,0,aa,$sss,0,0,32,5,,6,,9223372036854775807,1e-6 -343,2,cc,asd,0,0,-130,6,,,Turkey,-9223372036854775807,0.0 -432,1,aa,_f,0,0,-120,7,20,,,9223372036854775807,0.000001 -511,2,cc,t,0,0,4010,8,,1,,-9223372036854775807,-1.0 -500,3,bb,😈,0,0,-40,9,13,14,Germany,9223372036854775807,1234124.51325 \ No newline at end of file +u8_offset_encoded,non_dense_ints,enum,string_packed,constant0,constant0_2,negative,id,nullable_int,nullable_int2,country,largenum,float,nullable_float +256,0,aa,xyz,0,0,-199,0,-1,,Germany,-9223372036854775807,0.123412, +258,2,aa,abc,0,0,39,1,-40,-40,USA,9223372036854775807,3e-4, +259,3,aa,axz,0,0,-100,2,,,France,9223372036854775807,-124.0,0.4 +257,1,bb,AXY,0,0,34,3,,0,,9223372036854775807,3.15159, +275,4,bb,azy,0,0,4031,4,10,9,France,-9223372036854775807,0.1234e30, +500,0,aa,$sss,0,0,32,5,,6,,9223372036854775807,1e-6, +343,2,cc,asd,0,0,-130,6,,,Turkey,-9223372036854775807,0.0,1e-32 +432,1,aa,_f,0,0,-120,7,20,,,9223372036854775807,0.000001, +511,2,cc,t,0,0,4010,8,,1,,-9223372036854775807,-1.0, +500,3,bb,😈,0,0,-40,9,13,14,Germany,9223372036854775807,1234124.51325,1.123124e30 \ No newline at end of file diff --git a/tests/query_tests.rs b/tests/query_tests.rs index c9b12ad5..350acdd1 100644 --- a/tests/query_tests.rs +++ b/tests/query_tests.rs @@ -21,7 +21,7 @@ fn test_query(query: &str, expected_rows: &[Vec]) { .load_csv(LoadOptions::new("test_data/tiny.csv", "default").with_partition_size(40)), ); let result = if env::var("DEBUG_TESTS").is_ok() { - block_on(locustdb.run_query(query, true, true, vec![0, 1, 2])).unwrap() + block_on(locustdb.run_query(query, true, true, vec![0, 1, 2])).unwrap() } else { block_on(locustdb.run_query(query, true, true, vec![])).unwrap() }; @@ -43,12 +43,22 @@ fn test_query_ec(query: &str, expected_rows: &[Vec]) { .allow_nulls_all_columns(), ), ); - let result = if env::var("DEBUG_TESTS").is_ok() { - block_on(locustdb.run_query(query, false, true, vec![0, 1, 2, 3])).unwrap() + let result1; + let result2 = if env::var("DEBUG_TESTS").is_ok() { + result1 = block_on(locustdb.run_query(query, false, true, vec![0, 1, 2, 3])).unwrap(); + locustdb.force_flush(); + block_on(locustdb.run_query(query, false, true, vec![0, 1, 2, 3])).unwrap() } else { + result1 = block_on(locustdb.run_query(query, false, true, vec![])).unwrap(); + locustdb.force_flush(); block_on(locustdb.run_query(query, false, true, vec![])).unwrap() }; - assert_eq!(result.unwrap().rows.unwrap(), expected_rows); + assert_eq!( + result1.as_ref().unwrap().rows, + result2.as_ref().unwrap().rows, + "Query results differ after flush" + ); + assert_eq!(result1.unwrap().rows.unwrap(), expected_rows); } fn test_query_ec_err(query: &str, _expected_err: QueryError) { @@ -303,7 +313,7 @@ fn test_sum() { &[ vec![Str("aa"), Float(OrderedFloat(-123.87628600000001))], vec![Str("bb"), Float(OrderedFloat(1.234e29))], - vec![Str("cc"), Float(OrderedFloat(-1.0))] + vec![Str("cc"), Float(OrderedFloat(-1.0))], ], ); } @@ -431,7 +441,6 @@ fn test_not_equals_2() { ) } - #[test] fn test_order_by_float() { test_query_ec( @@ -550,9 +559,21 @@ fn test_min_max() { test_query_ec( "select enum, max(float), min(float) from default;", &[ - vec![Str("aa"), Float(OrderedFloat(0.123412)), Float(OrderedFloat(-124.0))], - vec![Str("bb"), Float(OrderedFloat(1.234e29)), Float(OrderedFloat(3.15159))], - vec![Str("cc"), Float(OrderedFloat(0.0)), Float(OrderedFloat(-1.0))] + vec![ + Str("aa"), + Float(OrderedFloat(0.123412)), + Float(OrderedFloat(-124.0)), + ], + vec![ + Str("bb"), + Float(OrderedFloat(1.234e29)), + Float(OrderedFloat(3.15159)), + ], + vec![ + Str("cc"), + Float(OrderedFloat(0.0)), + Float(OrderedFloat(-1.0)), + ], ], ); } @@ -1021,12 +1042,26 @@ fn test_column_with_null_partitions() { ), )], })); - println!("{:?}", block_on(locustdb.run_query("SELECT * FROM test;", true, true, vec![])).unwrap().unwrap()); + println!( + "{:?}", + block_on(locustdb.run_query("SELECT * FROM test;", true, true, vec![])) + .unwrap() + .unwrap() + ); let query = "SELECT partition_sparse FROM test;"; let result = block_on(locustdb.run_query(query, true, true, vec![])) .unwrap() .unwrap(); - assert_eq!(result.rows.as_ref().unwrap().iter().filter(|&x| x == &[Null]).count(), 13); + assert_eq!( + result + .rows + .as_ref() + .unwrap() + .iter() + .filter(|&x| x == &[Null]) + .count(), + 13 + ); assert_eq!( result .rows @@ -1069,7 +1104,7 @@ fn test_group_by_string() { })); let query = "SELECT scrambled, count(1) FROM test LIMIT 5;"; - let result = block_on(locustdb.run_query(query, true, true, vec![])) + let result = block_on(locustdb.run_query(query, true, true, vec![])) .unwrap() .unwrap(); let expected_rows = vec![ @@ -1135,6 +1170,62 @@ fn test_group_by_float() { ); } +#[test] +fn test_or_nullcheck_and_filter() { + test_query_ec( + "SELECT nullable_int2, float FROM default WHERE nullable_int2 IS NOT NULL OR float IS NOT NULL ORDER BY id LIMIT 100000;", + &[ + vec![Null, Float(OrderedFloat(0.123412))], + vec![Int(-40), Float(OrderedFloat(0.0003))], + vec![Null, Float(OrderedFloat(-124.0))], + vec![Int(0), Float(OrderedFloat(3.15159))], + vec![Int(9), Float(OrderedFloat(1.234e29))], + vec![Int(6), Float(OrderedFloat(1e-6))], + vec![Null, Float(OrderedFloat(0.0))], + vec![Null, Float(OrderedFloat(1e-6))], + vec![Int(1), Float(OrderedFloat(-1.0))], + vec![Int(14), Float(OrderedFloat(1234124.51325))] + ] + ); + // Tests aliasing of OR inputs (both resolve to same constant expand) + test_query_ec( + "SELECT id FROM default WHERE id IS NULL OR float IS NULL ORDER BY id LIMIT 100000;", + &[], + ); + test_query_ec( + "SELECT nullable_int2, nullable_float FROM default WHERE nullable_int2 IS NOT NULL AND (nullable_float IS NOT NULL) ORDER BY id LIMIT 100000;", + &[ + vec![Int(14), Float(OrderedFloat(1.123124e30))], + ] + ); + test_query_ec( + "SELECT nullable_int2, nullable_float FROM default WHERE nullable_int2 IS NOT NULL AND (nullable_float IS NOT NULL) LIMIT 100000;", + &[ + vec![Int(14), Float(OrderedFloat(1.123124e30))], + ] + ); +} + +#[test] +fn test_select_0_of_everything() { + test_query_ec("SELECT * FROM default LIMIT 0;", &[]) +} + +#[test] +fn test_filter_nonexistant_columns() { + test_query_ec( + "SELECT nullable_int2, lolololol, also_doesnt_exist FROM default WHERE nullable_int2 IS NOT NULL;", + &[ + vec![Int(-40), Null, Null], + vec![Int(0), Null, Null], + vec![Int(9), Null, Null], + vec![Int(6), Null, Null], + vec![Int(1), Null, Null], + vec![Int(14), Null, Null] + ], + ) +} + #[test] fn test_restore_from_disk() { use std::{thread, time}; @@ -1201,4 +1292,4 @@ fn test_colnames() { "SELECT \"u8_offset_encoded\" FROM \"default\" WHERE \"u8_offset_encoded\" = 256;", vec!["u8_offset_encoded".to_string()], ); -} \ No newline at end of file +}