Skip to content

Commit

Permalink
fix order by for null/nonexistant column
Browse files Browse the repository at this point in the history
  • Loading branch information
cswinter committed Mar 15, 2024
1 parent 225aaee commit c9d7b95
Show file tree
Hide file tree
Showing 11 changed files with 277 additions and 40 deletions.
4 changes: 4 additions & 0 deletions locustdb-derive/src/reify_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ fn types(t: &Ident) -> Option<Vec<Type>> {
"NullableInteger" => Some(vec![Type::NullableU8, Type::NullableU16, Type::NullableU32, Type::NullableI64]),
"NullableFloat" => Some(vec![Type::NullableF64]),
"Primitive" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64, Type::Str, Type::OptStr, Type::OptF64]),
"PrimitiveOrVal" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64, Type::Str, Type::OptStr, Type::OptF64, Type::Val]),
"NullablePrimitive" => Some(vec![Type::NullableU8, Type::NullableU16, Type::NullableU32, Type::NullableI64, Type::NullableF64, Type::NullableStr]),
"PrimitiveUSize" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64, Type::Str, Type::USize]),
"PrimitiveNoU64" => Some(vec![Type::U8, Type::U16, Type::U32, Type::I64, Type::F64, Type::Str]),
Expand All @@ -227,6 +228,7 @@ enum Type {
I64,
F64,
Str,
Val,

OptStr, // Option<&str>, used when sorting instead of representation of raw valls + present bit vec
OptF64, // Option<OrderedFloat<f64>>, used when sorting
Expand Down Expand Up @@ -261,6 +263,7 @@ impl Type {
Type::I64 => parse_quote!(EncodingType::I64),
Type::F64 => parse_quote!(EncodingType::F64),
Type::Str => parse_quote!(EncodingType::Str),
Type::Val => parse_quote!(EncodingType::Val),
Type::OptStr => parse_quote!(EncodingType::OptStr),
Type::OptF64 => parse_quote!(EncodingType::OptF64),
Type::NullableU8 => parse_quote!(EncodingType::NullableU8),
Expand Down Expand Up @@ -291,6 +294,7 @@ impl Type {
Type::I64 => parse_quote!( let #variable = #variable.buffer.i64(); ),
Type::F64 => parse_quote!( let #variable = #variable.buffer.f64(); ),
Type::Str => parse_quote!( let #variable = #variable.buffer.str(); ),
Type::Val => parse_quote!( let #variable = #variable.buffer.val(); ),
Type::OptStr => parse_quote!( let #variable = #variable.buffer.opt_str(); ),
Type::OptF64 => parse_quote!( let #variable = #variable.buffer.opt_f64(); ),
Type::NullableU8 => parse_quote!( let #variable = #variable.buffer.nullable_u8(); ),
Expand Down
40 changes: 36 additions & 4 deletions src/engine/data_types/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ impl EncodingType {
| EncodingType::NullableU16
| EncodingType::NullableU32
| EncodingType::NullableU64
| EncodingType::NullableF64
| EncodingType::OptStr
| EncodingType::OptF64 => true,
EncodingType::Str
| EncodingType::NullableF64 => true,
EncodingType::OptStr
| EncodingType::OptF64
| EncodingType::Str
| EncodingType::I64
| EncodingType::U8
| EncodingType::U16
Expand Down Expand Up @@ -146,6 +146,38 @@ impl EncodingType {
}
}

pub fn is_constant(&self) -> bool {
match *self {
EncodingType::NullableStr
| EncodingType::NullableI64
| EncodingType::NullableU8
| EncodingType::NullableU16
| EncodingType::NullableU32
| EncodingType::NullableU64
| EncodingType::NullableF64
| EncodingType::OptStr
| EncodingType::OptF64
| EncodingType::Str
| EncodingType::I64
| EncodingType::U8
| EncodingType::U16
| EncodingType::U32
| EncodingType::U64
| EncodingType::F64
| EncodingType::USize
| EncodingType::Val
| EncodingType::ByteSlices(_)
| EncodingType::ValRows
| EncodingType::Premerge
| EncodingType::MergeOp => false,
| EncodingType::ScalarI64
| EncodingType::ScalarStr
| EncodingType::ScalarString
| EncodingType::Null
| EncodingType::ConstVal => true,
}
}

pub fn least_upper_bound(&self, other: EncodingType) -> EncodingType {
if *self == other {
*self
Expand Down
10 changes: 9 additions & 1 deletion src/engine/execution/batch_merging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,15 @@ pub fn combine<'a>(
let (merge_ops, merged_final_sort_col) = if batch1.order_by.len() == 1 {
let (index1, desc) = batch1.order_by[0];
let (index2, _) = batch2.order_by[0];
let (left, right) = unify_types(&mut qp, left[index1], right[index2]);
let mut left = left[index1];
let mut right = right[index2];
if left.tag == EncodingType::Null {
left = qp.cast(left, EncodingType::Val);
}
if right.tag == EncodingType::Null {
right = qp.cast(right, EncodingType::Val);
}
let (left, right) = unify_types(&mut qp, left, right);
qp.merge(left, right, limit, desc)
} else {
let (first_sort_col_index1, desc) = batch1.order_by[0];
Expand Down
2 changes: 2 additions & 0 deletions src/engine/execution/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ impl TypedBufferRef {

pub fn is_nullable(&self) -> bool { self.tag.is_nullable() }

pub fn is_constant(&self) -> bool { self.tag.is_constant() }

pub fn nullable_any(&self) -> Result<BufferRef<Nullable<Any>>, QueryError> {
ensure!(self.tag.is_nullable(), "{:?} is not nullable", self.tag);
Ok(self.buffer.cast_nullable_any())
Expand Down
67 changes: 67 additions & 0 deletions src/engine/operators/comparator.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use ordered_float::OrderedFloat;

use crate::mem_store::Val;

pub trait Comparator<T> {
fn cmp(left: T, right: T) -> bool;
fn cmp_eq(left: T, right: T) -> bool;
Expand Down Expand Up @@ -65,6 +67,39 @@ impl<'a> Comparator<Option<&'a str>> for CmpLessThan {
fn is_less_than() -> bool { true }
}

// Null < Bool < Integer < Str < Float
impl<'a> Comparator<Val<'a>> for CmpLessThan {
fn cmp(left: Val<'a>, right: Val<'a>) -> bool {
match (left, right) {
(Val::Null, _) => true,
(_, Val::Null) => false,
(Val::Bool(l), Val::Bool(r)) => !l & r,
(Val::Bool(_), _) => true,
(_, Val::Bool(_)) => false,
(Val::Integer(l), Val::Integer(r)) => l < r,
(Val::Integer(_), _) => true,
(_, Val::Integer(_)) => false,
(Val::Str(l), Val::Str(r)) => l < r,
(Val::Str(_), _) => true,
(_, Val::Str(_)) => false,
(Val::Float(l), Val::Float(r)) => l < r,
}
}

fn cmp_eq(left: Val<'a>, right: Val<'a>) -> bool {
match (left, right) {
(Val::Null, Val::Null) => true,
(Val::Bool(l), Val::Bool(r)) => l == r,
(Val::Integer(l), Val::Integer(r)) => l == r,
(Val::Str(l), Val::Str(r)) => l == r,
(Val::Float(l), Val::Float(r)) => l == r,
_ => false,
}
}

fn is_less_than() -> bool { true }
}


#[derive(Debug)]
pub struct CmpGreaterThan;
Expand Down Expand Up @@ -122,3 +157,35 @@ impl<'a> Comparator<Option<&'a str>> for CmpGreaterThan {
fn cmp_eq(left: Option<&str>, right: Option<&str>) -> bool { left >= right }
fn is_less_than() -> bool { false }
}

impl<'a> Comparator<Val<'a>> for CmpGreaterThan {
fn cmp(left: Val<'a>, right: Val<'a>) -> bool {
match (left, right) {
(Val::Null, _) => false,
(_, Val::Null) => true,
(Val::Bool(l), Val::Bool(r)) => l & !r,
(Val::Bool(_), _) => false,
(_, Val::Bool(_)) => true,
(Val::Integer(l), Val::Integer(r)) => l > r,
(Val::Integer(_), _) => false,
(_, Val::Integer(_)) => true,
(Val::Str(l), Val::Str(r)) => l > r,
(Val::Str(_), _) => false,
(_, Val::Str(_)) => true,
(Val::Float(l), Val::Float(r)) => l > r,
}
}

fn cmp_eq(left: Val<'a>, right: Val<'a>) -> bool {
match (left, right) {
(Val::Null, Val::Null) => true,
(Val::Bool(l), Val::Bool(r)) => l >= r,
(Val::Integer(l), Val::Integer(r)) => l >= r,
(Val::Str(l), Val::Str(r)) => l >= r,
(Val::Float(l), Val::Float(r)) => l >= r,
_ => false,
}
}

fn is_less_than() -> bool { false }
}
3 changes: 2 additions & 1 deletion src/engine/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ mod merge_partitioned;
mod nonzero_compact;
mod nonzero_indices;
mod null_vec;
mod null_to_vec;
mod null_to_val;
mod null_vec_like;
mod numeric_operators;
mod parameterized_vec_vec_int_op;
Expand All @@ -69,4 +71,3 @@ mod slice_pack;
mod slice_unpack;

mod aggregator;

46 changes: 46 additions & 0 deletions src/engine/operators/null_to_val.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use crate::engine::*;
use crate::mem_store::Val;

#[derive(Debug)]
pub struct NullToVal {
pub input: BufferRef<Any>,
pub output: BufferRef<Val<'static>>,

pub batch_size: usize,
}

impl<'a> VecOperator<'a> for NullToVal {
fn execute(&mut self, _: bool, scratchpad: &mut Scratchpad<'a>) -> Result<(), QueryError> {
let len = scratchpad.get_any(self.input).len();
if self.batch_size > len {
let mut output = scratchpad.get_mut(self.output);
output.truncate(len);
}
Ok(())
}

fn init(&mut self, _: usize, batch_size: usize, scratchpad: &mut Scratchpad<'a>) {
self.batch_size = batch_size;
scratchpad.set(self.output, vec![Val::Null; batch_size]);
}

fn inputs(&self) -> Vec<BufferRef<Any>> {
vec![self.input.any()]
}
fn outputs(&self) -> Vec<BufferRef<Any>> {
vec![self.output.any()]
}
fn can_stream_input(&self, _: usize) -> bool {
true
}
fn can_stream_output(&self, _: usize) -> bool {
true
}
fn allocates(&self) -> bool {
true
}

fn display_op(&self, _: bool) -> String {
format!("{} expand as Val", self.input)
}
}
54 changes: 54 additions & 0 deletions src/engine/operators/null_to_vec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use crate::engine::*;

// Take a null count and expands it into a nullable vec of the same length with arbitrary type and all values set to null
#[derive(Debug)]
pub struct NullToVec<T> {
pub input: BufferRef<Any>,
pub output: BufferRef<Nullable<T>>,

pub batch_size: usize,
}

impl<'a, T: 'a> VecOperator<'a> for NullToVec<T>
where
T: VecData<T> + Copy + Default,
{
fn execute(&mut self, _: bool, scratchpad: &mut Scratchpad<'a>) -> Result<(), QueryError> {
let len = scratchpad.get_any(self.input).len();
if self.batch_size > len {
let (mut output, mut present) = scratchpad.get_mut_nullable(self.output);
output.truncate(len);
present.truncate((len + 7) / 8);
}
Ok(())
}

fn init(&mut self, _: usize, batch_size: usize, scratchpad: &mut Scratchpad<'a>) {
self.batch_size = batch_size;
scratchpad.set_nullable(
self.output,
vec![T::default(); batch_size],
vec![0u8; (batch_size + 7) / 8],
);
}

fn inputs(&self) -> Vec<BufferRef<Any>> {
vec![self.input.any()]
}
fn outputs(&self) -> Vec<BufferRef<Any>> {
vec![self.output.any()]
}
fn can_stream_input(&self, _: usize) -> bool {
true
}
fn can_stream_output(&self, _: usize) -> bool {
true
}
fn allocates(&self) -> bool {
true
}

fn display_op(&self, _: bool) -> String {
format!("{} expand as Nullable<{:?}>", self.input, T::t())
}
}
24 changes: 19 additions & 5 deletions src/engine/operators/vector_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ use super::merge_keep::*;
use super::merge_partitioned::MergePartitioned;
use super::nonzero_compact::NonzeroCompact;
use super::nonzero_indices::NonzeroIndices;
use super::null_to_val::NullToVal;
use super::null_to_vec::NullToVec;
use super::null_vec::NullVec;
use super::null_vec_like::NullVecLike;
use super::numeric_operators::*;
Expand Down Expand Up @@ -1113,6 +1115,12 @@ pub mod operator {
Ok(Box::new(NullableIntToVal { input, vals: output }) as BoxedOperator<'a>)
}
}
} else if input.tag == EncodingType::Null {
Ok(Box::new(NullToVal {
input: input.any(),
output,
batch_size: 0,
}))
} else {
reify_types! {
"type_conversion";
Expand Down Expand Up @@ -1142,6 +1150,12 @@ pub mod operator {
Ok(Box::new(TypeConversionOperator { input, output }) as BoxedOperator<'a>)
}
}
} else if input.tag == EncodingType::Null {
reify_types! {
"null_to_vec";
output: NullablePrimitive;
Ok(Box::new(NullToVec { input: input.any(), output, batch_size: 0 }))
}
} else {
if input.tag == EncodingType::Str && output.tag == EncodingType::OptStr {
return Ok(Box::new(TypeConversionOperator {
Expand Down Expand Up @@ -1389,13 +1403,13 @@ pub mod operator {
}
if ranking.is_nullable() {
reify_types! {
"sort_indices";
"sort_by_nullable";
ranking: NullablePrimitive;
Ok(Box::new(SortByNullable { ranking, output, indices, descending, stable }))
}
} else {
reify_types! {
"sort_indices";
"sort_by";
ranking: Primitive;
Ok(Box::new(SortBy { ranking, output, indices, descending, stable }))
}
Expand Down Expand Up @@ -1569,13 +1583,13 @@ pub mod operator {
if desc {
reify_types! {
"merge_desc";
left, right, merged_out: Primitive;
left, right, merged_out: PrimitiveOrVal;
Ok(Box::new(Merge { left, right, merged: merged_out, merge_ops: ops_out, limit, c: PhantomData::<CmpGreaterThan> }))
}
} else {
reify_types! {
"merge_desc";
left, right, merged_out: Primitive;
"merge_asc";
left, right, merged_out: PrimitiveOrVal;
Ok(Box::new(Merge { left, right, merged: merged_out, merge_ops: ops_out, limit, c: PhantomData::<CmpLessThan> }))
}
}
Expand Down
Loading

0 comments on commit c9d7b95

Please sign in to comment.