Skip to content

Commit

Permalink
Fix aggregation of null/nullable columns (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
cswinter authored Jul 21, 2024
1 parent b629248 commit 22d3b33
Show file tree
Hide file tree
Showing 40 changed files with 1,539 additions and 633 deletions.
1 change: 0 additions & 1 deletion locustdb-derive/src/ast_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,6 @@ fn parse_type(field_ident: &Ident, type_def: String) -> Option<(Expr, Option<FnA
static ref T: Regex = Regex::new(r#"t = "(.*)""#).unwrap();
static ref BASE: Regex = Regex::new(r#"base=([^;]*)"#).unwrap();
static ref NULL: Regex = Regex::new(r#"null=([^;]*)"#).unwrap();

}

if let Some(t) = T.captures(&type_def) {
Expand Down
11 changes: 4 additions & 7 deletions locustdb-derive/src/reify_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,10 @@ fn types(t: &Ident) -> Option<Vec<Type>> {
"NullableInteger" => Some(vec![Type::NullableU8, Type::NullableU16, Type::NullableU32, Type::NullableI64]),
"NullableNumber" => Some(vec![Type::NullableU8, Type::NullableU16, Type::NullableU32, Type::NullableI64, Type::NullableF64]),
"NullableFloat" => Some(vec![Type::NullableF64]),
"Primitive" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64, Type::Str, Type::OptStr, Type::OptF64]),
"PrimitiveOrVal" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64, Type::Str, Type::OptStr, Type::OptF64, Type::Val]),
"VecData" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64, Type::USize, Type::Str, Type::OptStr, Type::OptF64, Type::Val, Type::Bitvec]),
"VecDataNoU64" => Some(vec![Type::U8, Type::U16, Type::U32, Type::I64, Type::F64, Type::USize, Type::Str, Type::OptStr, Type::OptF64, Type::Val, Type::Bitvec]),
"Primitive" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64, Type::Str, Type::OptStr]),
"PrimitiveOrVal" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64, Type::Str, Type::OptStr, Type::Val]),
"VecData" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64, Type::USize, Type::Str, Type::OptStr, Type::Val, Type::Bitvec]),
"VecDataNoU64" => Some(vec![Type::U8, Type::U16, Type::U32, Type::I64, Type::F64, Type::USize, Type::Str, Type::OptStr, Type::Val, Type::Bitvec]),
"NullablePrimitive" => Some(vec![Type::NullableU8, Type::NullableU16, Type::NullableU32, Type::NullableI64, Type::NullableF64, Type::NullableStr]),
"PrimitiveUSize" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64, Type::Str, Type::USize]),
"PrimitiveNoU64" => Some(vec![Type::U8, Type::U16, Type::U32, Type::I64, Type::F64, Type::Str]),
Expand All @@ -237,7 +237,6 @@ enum Type {
Bitvec,

OptStr, // Option<&str>, used when sorting instead of representation of raw valls + present bit vec
OptF64, // Option<OrderedFloat<f64>>, used when sorting

NullableU8,
NullableU16,
Expand Down Expand Up @@ -273,7 +272,6 @@ impl Type {
Type::Val => parse_quote!(EncodingType::Val),
Type::Bitvec => parse_quote!(EncodingType::Bitvec),
Type::OptStr => parse_quote!(EncodingType::OptStr),
Type::OptF64 => parse_quote!(EncodingType::OptF64),
Type::NullableU8 => parse_quote!(EncodingType::NullableU8),
Type::NullableU16 => parse_quote!(EncodingType::NullableU16),
Type::NullableU32 => parse_quote!(EncodingType::NullableU32),
Expand Down Expand Up @@ -306,7 +304,6 @@ impl Type {
Type::Val => parse_quote!( let #variable = #variable.buffer.val(); ),
Type::Bitvec => parse_quote!( let #variable = #variable.buffer.u8(); ),
Type::OptStr => parse_quote!( let #variable = #variable.buffer.opt_str(); ),
Type::OptF64 => parse_quote!( let #variable = #variable.buffer.opt_f64(); ),
Type::NullableU8 => parse_quote!( let #variable = #variable.buffer.nullable_u8(); ),
Type::NullableU16 => parse_quote!( let #variable = #variable.buffer.nullable_u16(); ),
Type::NullableU32 => parse_quote!( let #variable = #variable.buffer.nullable_u32(); ),
Expand Down
8 changes: 8 additions & 0 deletions src/bitvec.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub trait BitVecMut {
fn set(&mut self, index: usize);
fn unset(&mut self, index: usize);
}

pub trait BitVec {
Expand All @@ -14,6 +15,13 @@ impl BitVecMut for Vec<u8> {
}
self[slot] |= 1 << (index as u8 & 7)
}

fn unset(&mut self, index: usize) {
let slot = index >> 3;
if slot < self.len() {
self[slot] &= 0xff ^ (1 << (index as u8 & 7));
}
}
}

impl BitVec for Vec<u8> {
Expand Down
33 changes: 0 additions & 33 deletions src/engine/data_types/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,24 +443,6 @@ impl<'a> Data<'a> for Vec<OrderedFloat<f64>> {
}
}

impl<'a> Data<'a> for Vec<Option<OrderedFloat<f64>>> {
fn cast_ref_opt_f64(&self) -> &[Option<OrderedFloat<f64>>] {
self
}
fn cast_ref_mut_opt_f64(&mut self) -> &mut Vec<Option<OrderedFloat<f64>>> {
self
}
fn to_mixed(&self) -> Vec<Val<'a>> {
self.iter()
.map(|s| match s {
None => Val::Null,
Some(s) => Val::Float(*s)
})
.collect()
}
}


impl<'a> Data<'a> for Vec<MergeOp> {
fn cast_ref_merge_op(&self) -> &[MergeOp] {
self
Expand Down Expand Up @@ -643,21 +625,6 @@ impl<'a> Data<'a> for &'a [OrderedFloat<f64>] {
}
}

impl<'a> Data<'a> for &'a [Option<OrderedFloat<f64>>] {
fn cast_ref_opt_f64(&self) -> &[Option<OrderedFloat<f64>>] {
self
}

fn to_mixed(&self) -> Vec<Val<'a>> {
self.iter()
.map(|s| match s {
None => Val::Null,
Some(s) => Val::Float(*s),
})
.collect()
}
}

impl<'a> Data<'a> for &'a [MergeOp] {
fn cast_ref_merge_op(&self) -> &[MergeOp] {
self
Expand Down
39 changes: 18 additions & 21 deletions src/engine/data_types/types.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
use ordered_float::OrderedFloat;
use serde::{Deserialize, Serialize};

use crate::mem_store::*;

// WARNING: Changing this enum will break backwards compatibility with existing data
// Special NaN value that we use to represent NULLs in the data.
pub const F64_NULL: OrderedFloat<f64> = OrderedFloat(unsafe { std::mem::transmute::<u64, f64>(0x7ffa_aaaa_aaaa_aaaau64) });
pub const I64_NULL: i64 = i64::MAX;



#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug, Serialize, Deserialize)]
pub enum EncodingType {
// Straightforward vector or slice of basic types
Str,
I64,
I64, // Can also represent null values as i64::MAX
U8,
U16,
U32,
U64,
F64,
F64, // Can also represent null values as F64_NULL NaN value
Val,
USize,
Bitvec, // this has the same representation as U8, but will have 1/8th the length
Expand All @@ -28,7 +34,6 @@ pub enum EncodingType {

// Vector of optional basic types. Used for grouping or sorting
OptStr,
OptF64,

// Represents null column as single `usize` value that is the length of the column
Null,
Expand All @@ -55,7 +60,7 @@ impl EncodingType {
EncodingType::Str => BasicType::String,
EncodingType::I64 => BasicType::Integer,
EncodingType::F64 => BasicType::Float,
EncodingType::NullableStr => BasicType::NullableString,
EncodingType::NullableStr | EncodingType::OptStr => BasicType::NullableString,
EncodingType::NullableI64 => BasicType::NullableInteger,
EncodingType::NullableF64 => BasicType::NullableFloat,
EncodingType::Val => BasicType::Val,
Expand All @@ -78,16 +83,15 @@ impl EncodingType {
EncodingType::F64 | EncodingType::NullableF64 => EncodingType::NullableF64,
EncodingType::Val => EncodingType::Val,
EncodingType::OptStr => EncodingType::OptStr,
EncodingType::OptF64 => EncodingType::OptF64,
_ => panic!("{:?} does not have a corresponding nullable type", &self),
}
}

pub fn nullable_fused(&self) -> EncodingType {
match self {
EncodingType::NullableStr => EncodingType::OptStr,
EncodingType::NullableF64 => EncodingType::OptF64,
EncodingType::NullableI64 => EncodingType::I64,
EncodingType::NullableF64 => EncodingType::F64,
_ => panic!(
"{:?} does not have a corresponding fused nullable type",
&self
Expand All @@ -107,7 +111,6 @@ impl EncodingType {
| EncodingType::NullableU64
| EncodingType::NullableF64 => true,
EncodingType::OptStr
| EncodingType::OptF64
| EncodingType::Str
| EncodingType::I64
| EncodingType::U8
Expand All @@ -134,7 +137,7 @@ impl EncodingType {
/// Returns whether the encoding type can represent null values without an associated null map.
pub fn is_naturally_nullable(&self) -> bool {
match self {
EncodingType::Val | EncodingType::OptStr | EncodingType::OptF64 => true,
EncodingType::Val | EncodingType::OptStr => true,
EncodingType::NullableStr
| EncodingType::NullableI64
| EncodingType::NullableU8
Expand Down Expand Up @@ -172,7 +175,7 @@ impl EncodingType {
EncodingType::NullableU16 => EncodingType::U16,
EncodingType::NullableU32 => EncodingType::U32,
EncodingType::NullableU64 => EncodingType::U64,
EncodingType::OptF64 | EncodingType::NullableF64 => EncodingType::F64,
EncodingType::NullableF64 => EncodingType::F64,
EncodingType::Str
| EncodingType::I64
| EncodingType::U8
Expand Down Expand Up @@ -206,7 +209,6 @@ impl EncodingType {
| EncodingType::NullableU64
| EncodingType::NullableF64
| EncodingType::OptStr
| EncodingType::OptF64
| EncodingType::Str
| EncodingType::I64
| EncodingType::U8
Expand Down Expand Up @@ -240,7 +242,6 @@ impl EncodingType {
| EncodingType::NullableU64
| EncodingType::NullableF64
| EncodingType::OptStr
| EncodingType::OptF64
| EncodingType::Str
| EncodingType::I64
| EncodingType::U8
Expand Down Expand Up @@ -269,14 +270,11 @@ impl EncodingType {
*self
} else {
match (self, other) {
(EncodingType::Val, _) => EncodingType::Val,
(_, EncodingType::Val) => EncodingType::Val,
(EncodingType::F64, EncodingType::I64) => EncodingType::Val,
(EncodingType::I64, EncodingType::F64) => EncodingType::Val,
(EncodingType::OptStr, EncodingType::Str) => EncodingType::OptStr,
(EncodingType::Str, EncodingType::OptStr) => EncodingType::OptStr,
(EncodingType::OptF64, EncodingType::F64) => EncodingType::OptF64,
(EncodingType::F64, EncodingType::OptF64) => EncodingType::OptF64,
(EncodingType::Val, _) | (_, EncodingType::Val) => EncodingType::Val,
(EncodingType::F64, EncodingType::I64) | (EncodingType::I64, EncodingType::F64) => EncodingType::Val,
(EncodingType::F64, EncodingType::Null) | (EncodingType::Null, EncodingType::F64) => EncodingType::F64,
(EncodingType::I64, EncodingType::Null) | (EncodingType::Null, EncodingType::I64) => EncodingType::I64,
(EncodingType::OptStr, EncodingType::Str) | (EncodingType::Str, EncodingType::OptStr) => EncodingType::OptStr,
_ => unimplemented!("lub not implemented for {:?} and {:?}", self, other),
}
}
Expand All @@ -302,7 +300,6 @@ impl EncodingType {
EncodingType::NullableU64 => 15,
EncodingType::NullableF64 => 16,
EncodingType::OptStr => 17,
EncodingType::OptF64 => 18,
EncodingType::Null => 19,
EncodingType::ScalarF64 => 20,
EncodingType::ScalarI64 => 21,
Expand Down
19 changes: 3 additions & 16 deletions src/engine/data_types/vec_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::mem_store::value::Val;
use crate::engine::data_types::*;


pub trait VecData<T>: PartialEq + Ord + Copy + Debug + Sync + Send {
pub trait VecData<T>: PartialEq + Copy + Debug + Sync + Send {
fn unwrap<'a, 'b>(vec: &'b dyn Data<'a>) -> &'b [T] where T: 'a;
fn unwrap_mut<'a, 'b>(vec: &'b mut dyn Data<'a>) -> &'b mut Vec<T> where T: 'a;
fn wrap_one(_value: T) -> RawVal { panic!("Can't wrap scalar of type {:?}", Self::t()) }
Expand Down Expand Up @@ -46,7 +46,7 @@ impl VecData<u32> for u32 {
impl VecData<i64> for i64 {
fn unwrap<'a, 'b>(vec: &'b dyn Data<'a>) -> &'b [i64] where i64: 'a { vec.cast_ref_i64() }
fn unwrap_mut<'a, 'b>(vec: &'b mut dyn Data<'a>) -> &'b mut Vec<i64> where i64: 'a { vec.cast_ref_mut_i64() }
fn wrap_one(value: i64) -> RawVal { if value == i64::MIN { RawVal::Null } else { RawVal::Int(value) } }
fn wrap_one(value: i64) -> RawVal { if value == I64_NULL { RawVal::Null } else { RawVal::Int(value) } }
fn t() -> EncodingType { EncodingType::I64 }
}

Expand All @@ -60,23 +60,10 @@ impl VecData<u64> for u64 {
impl VecData<OrderedFloat<f64>> for OrderedFloat<f64> {
fn unwrap<'a, 'b>(vec: &'b dyn Data<'a>) -> &'b [OrderedFloat<f64>] where OrderedFloat<f64>: 'a { vec.cast_ref_f64() }
fn unwrap_mut<'a, 'b>(vec: &'b mut dyn Data<'a>) -> &'b mut Vec<OrderedFloat<f64>> where OrderedFloat<f64>: 'a { vec.cast_ref_mut_f64() }
fn wrap_one(value: OrderedFloat<f64>) -> RawVal { RawVal::Float(value) }
fn wrap_one(value: OrderedFloat<f64>) -> RawVal { if value.to_bits() == F64_NULL.to_bits() { RawVal::Null } else { RawVal::Float(value) } }
fn t() -> EncodingType { EncodingType::F64 }
}

impl VecData<Option<OrderedFloat<f64>>> for Option<OrderedFloat<f64>> {
fn unwrap<'a, 'b>(vec: &'b dyn Data<'a>) -> &'b [Option<OrderedFloat<f64>>] where Option<OrderedFloat<f64>>: 'a { vec.cast_ref_opt_f64() }
fn unwrap_mut<'a, 'b>(vec: &'b mut dyn Data<'a>) -> &'b mut Vec<Option<OrderedFloat<f64>>> where Option<OrderedFloat<f64>>: 'a { vec.cast_ref_mut_opt_f64() }
fn wrap_one(value: Option<OrderedFloat<f64>>) -> RawVal {
match value {
Some(f) => RawVal::Float(f),
None => RawVal::Null,
}
}

fn t() -> EncodingType { EncodingType::OptF64 }
}

impl VecData<usize> for usize {
fn unwrap<'a, 'b>(vec: &'b dyn Data<'a>) -> &'b [usize] where usize: 'a { vec.cast_ref_usize() }
fn unwrap_mut<'a, 'b>(vec: &'b mut dyn Data<'a>) -> &'b mut Vec<usize> where usize: 'a { vec.cast_ref_mut_usize() }
Expand Down
9 changes: 7 additions & 2 deletions src/engine/execution/batch_merging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct BatchResult<'a> {
pub projection: Vec<usize>,
// Maps each projection in original query to corresponding result column (`projection` is just a subset without aggregating projections)
pub aggregations: Vec<(usize, Aggregator)>,
pub order_by: Vec<(usize, bool)>,
pub order_by: Vec<(usize, bool)>, // (index, desc)
pub level: u32,
pub scanned_range: Range<usize>,
pub batch_count: usize,
Expand All @@ -21,6 +21,7 @@ pub struct BatchResult<'a> {
pub unsafe_referenced_buffers: Vec<BoxedData<'a>>,
}


impl<'a> BatchResult<'a> {
pub fn len(&self) -> usize {
self.columns.first().map_or(0, |s| s.len())
Expand Down Expand Up @@ -136,7 +137,9 @@ pub fn combine<'a>(
let mut partitioning = qp.partition(l, r, limit, false);
for i in 1..(lprojection.len() - 1) {
let (l, r) = unify_types(&mut qp, left[lprojection[i]], right[rprojection[i]]);
partitioning = qp.subpartition(partitioning, l, r, false);
let l = null_to_val(&mut qp, l);
let r = null_to_val(&mut qp, r);
partitioning = qp.subpartition(partitioning,l, r, false);
}

let last = lprojection.len() - 1;
Expand Down Expand Up @@ -232,6 +235,8 @@ pub fn combine<'a>(
let (index1, desc) = batch1.order_by[i];
let (index2, _) = batch2.order_by[i];
let (l, r) = unify_types(&mut qp, left[index1], right[index2]);
let l = null_to_val(&mut qp, l);
let r = null_to_val(&mut qp, r);
partitioning = qp.subpartition(partitioning, l, r, desc);
}
let l = null_to_val(&mut qp, left[final_sort_col_index1]);
Expand Down
Loading

0 comments on commit 22d3b33

Please sign in to comment.