Skip to content

Commit

Permalink
bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
cswinter committed Mar 12, 2024
1 parent 37dae96 commit 0d0e1ce
Show file tree
Hide file tree
Showing 23 changed files with 763 additions and 127 deletions.
8 changes: 6 additions & 2 deletions locustdb-derive/src/reify_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ fn types(t: &Ident) -> Option<Vec<Type>> {
"Float" => Some(vec![Type::F64]),
"NullableInteger" => Some(vec![Type::NullableU8, Type::NullableU16, Type::NullableU32, Type::NullableI64]),
"NullableFloat" => Some(vec![Type::NullableF64]),
"Primitive" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64, Type::Str, Type::OptStr]),
"Primitive" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64, Type::Str, Type::OptStr, Type::OptF64]),
"NullablePrimitive" => Some(vec![Type::NullableU8, Type::NullableU16, Type::NullableU32, Type::NullableI64, Type::NullableF64, Type::NullableStr]),
"PrimitiveUSize" => Some(vec![Type::U8, Type::U16, Type::U32, Type::U64, Type::I64, Type::F64, Type::Str, Type::USize]),
"PrimitiveNoU64" => Some(vec![Type::U8, Type::U16, Type::U32, Type::I64, Type::F64, Type::Str]),
Expand All @@ -227,7 +227,9 @@ enum Type {
I64,
F64,
Str,
OptStr,

OptStr, // Option<&str>, used when sorting instead of representation of raw valls + present bit vec
OptF64, // Option<OrderedFloat<f64>>, used when sorting

NullableU8,
NullableU16,
Expand Down Expand Up @@ -260,6 +262,7 @@ impl Type {
Type::F64 => parse_quote!(EncodingType::F64),
Type::Str => parse_quote!(EncodingType::Str),
Type::OptStr => parse_quote!(EncodingType::OptStr),
Type::OptF64 => parse_quote!(EncodingType::OptF64),
Type::NullableU8 => parse_quote!(EncodingType::NullableU8),
Type::NullableU16 => parse_quote!(EncodingType::NullableU16),
Type::NullableU32 => parse_quote!(EncodingType::NullableU32),
Expand Down Expand Up @@ -289,6 +292,7 @@ impl Type {
Type::F64 => parse_quote!( let #variable = #variable.buffer.f64(); ),
Type::Str => parse_quote!( let #variable = #variable.buffer.str(); ),
Type::OptStr => parse_quote!( let #variable = #variable.buffer.opt_str(); ),
Type::OptF64 => parse_quote!( let #variable = #variable.buffer.opt_f64(); ),
Type::NullableU8 => parse_quote!( let #variable = #variable.buffer.nullable_u8(); ),
Type::NullableU16 => parse_quote!( let #variable = #variable.buffer.nullable_u16(); ),
Type::NullableU32 => parse_quote!( let #variable = #variable.buffer.nullable_u32(); ),
Expand Down
44 changes: 44 additions & 0 deletions src/engine/data_types/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ pub trait Data<'a>: Send + Sync {
fn cast_ref_f64(&self) -> &[OrderedFloat<f64>] {
panic!("{}", self.type_error("cast_ref_f64"))
}
fn cast_ref_opt_f64(&self) -> &[Option<OrderedFloat<f64>>] {
panic!("{}", self.type_error("cast_ref_opt_f64"))
}
fn cast_ref_u32(&self) -> &[u32] {
panic!("{}", self.type_error("cast_ref_u32"))
}
Expand Down Expand Up @@ -109,9 +112,15 @@ pub trait Data<'a>: Send + Sync {
fn cast_ref_mut_f64(&mut self) -> &mut Vec<OrderedFloat<f64>> {
panic!("{}", self.type_error("cast_ref_mut_f64"))
}
fn cast_ref_mut_opt_f64(&mut self) -> &mut Vec<Option<OrderedFloat<f64>>> {
panic!("{}", self.type_error("cast_ref_mut_opt_f64"))
}
fn cast_ref_mut_usize(&mut self) -> &mut Vec<usize> {
panic!("{}", self.type_error("cast_ref_mut_usize"))
}
fn cast_ref_mut_null(&mut self) -> &mut usize {
panic!("{}", self.type_error("cast_ref_mut_null"))
}

fn cast_ref_mut_mixed(&mut self) -> &mut Vec<Val<'a>> {
panic!("{}", self.type_error("cast_ref_mut_mixed"))
Expand Down Expand Up @@ -239,6 +248,9 @@ impl<'a, T: VecData<T> + 'a> Data<'a> for Vec<T> {
default fn cast_ref_f64(&self) -> &[OrderedFloat<f64>] {
panic!("{}", self.type_error("cast_ref_f64"))
}
default fn cast_ref_opt_f64(&self) -> &[Option<OrderedFloat<f64>>] {
panic!("{}", self.type_error("cast_ref_opt_f64"))
}
default fn cast_ref_usize(&self) -> &[usize] {
panic!("{}", self.type_error("cast_ref_usize"))
}
Expand Down Expand Up @@ -278,6 +290,9 @@ impl<'a, T: VecData<T> + 'a> Data<'a> for Vec<T> {
default fn cast_ref_mut_f64(&mut self) -> &mut Vec<OrderedFloat<f64>> {
panic!("{}", self.type_error("cast_ref_mut_f64"))
}
default fn cast_ref_mut_opt_f64(&mut self) -> &mut Vec<Option<OrderedFloat<f64>>> {
panic!("{}", self.type_error("cast_ref_mut_opt_f64"))
}
default fn cast_ref_mut_usize(&mut self) -> &mut Vec<usize> {
panic!("{}", self.type_error("cast_ref_mut_usize"))
}
Expand Down Expand Up @@ -426,6 +441,24 @@ impl<'a> Data<'a> for Vec<OrderedFloat<f64>> {
}
}

impl<'a> Data<'a> for Vec<Option<OrderedFloat<f64>>> {
fn cast_ref_opt_f64(&self) -> &[Option<OrderedFloat<f64>>] {
self
}
fn cast_ref_mut_opt_f64(&mut self) -> &mut Vec<Option<OrderedFloat<f64>>> {
self
}
fn to_mixed(&self) -> Vec<Val<'a>> {
self.iter()
.map(|s| match s {
None => Val::Null,
Some(s) => Val::Float(*s)
})
.collect()
}
}


impl<'a> Data<'a> for Vec<MergeOp> {
fn cast_ref_merge_op(&self) -> &[MergeOp] {
self
Expand Down Expand Up @@ -476,6 +509,13 @@ impl<'a, T: VecData<T> + 'a> Data<'a> for &'a [T] {
format!("&{:?}{}", T::t(), display_slice(self, 120))
}

fn make_nullable(&mut self, present: &[u8]) -> BoxedData<'a> {
Box::new(NullableVec {
data: self.to_vec(),
present: present.to_vec(),
})
}

// Copied from Data and marked default because specialization demands it
default fn cast_ref_merge_op(&self) -> &[MergeOp] {
panic!("{}", self.type_error("cast_ref_merge_op"))
Expand Down Expand Up @@ -603,6 +643,10 @@ impl<'a> Data<'a> for usize {
}
}

fn cast_ref_mut_null(&mut self) -> &mut usize {
self
}

fn to_mixed(&self) -> Vec<Val<'a>> {
vec![Val::Null; *self]
}
Expand Down
13 changes: 11 additions & 2 deletions src/engine/data_types/nullable_vec_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,20 @@ impl<'a, T: VecData<T> + 'a> Data<'a> for NullableVec<T> {
fn type_error(&self, func_name: &str) -> String {
format!("NullableVec<{:?}>.{}", T::t(), func_name)
}
fn slice_box<'b>(&'b self, _: usize, _: usize) -> BoxedData<'b>
fn slice_box<'b>(&'b self, from: usize, to: usize) -> BoxedData<'b>
where
'a: 'b,
{
panic!("nullable slice box!")
// TODO: more efficient implementation that doesn't clone?
let to = min(to, self.len());
let data = self.data[from..to].to_vec();
let mut present = vec![0u8; (to - from + 7) / 8];
for i in from..to {
if self.present.is_set(i) {
present.set(i - from);
}
}
Box::new(NullableVec { data, present })
}

default fn append_all(&mut self, other: &dyn Data<'a>, count: usize) -> Option<BoxedData<'a>> {
Expand Down
75 changes: 62 additions & 13 deletions src/engine/data_types/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@ use serde::{Deserialize, Serialize};

use crate::mem_store::*;

// WARNING: Changing this enum will break backwards compatibility with existing data
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug, Serialize, Deserialize)]
pub enum EncodingType {
Str,
OptStr,
I64,
U8,
U16,
U32,
U64,
F64,

OptStr,
OptF64,

NullableStr,
NullableI64,
NullableU8,
Expand Down Expand Up @@ -61,7 +64,9 @@ impl EncodingType {
EncodingType::U16 | EncodingType::NullableU16 => EncodingType::NullableU16,
EncodingType::U32 | EncodingType::NullableU32 => EncodingType::NullableU32,
EncodingType::U64 | EncodingType::NullableU64 => EncodingType::NullableU64,
EncodingType::F64 | EncodingType::NullableF64 => EncodingType::NullableF64,
EncodingType::F64 | EncodingType::NullableF64 | EncodingType::OptF64 => {
EncodingType::NullableF64
}
EncodingType::Val => EncodingType::Val,
_ => panic!("{:?} does not have a corresponding nullable type", &self),
}
Expand All @@ -70,6 +75,7 @@ impl EncodingType {
pub fn nullable_fused(&self) -> EncodingType {
match self {
EncodingType::NullableStr => EncodingType::OptStr,
EncodingType::NullableF64 => EncodingType::OptF64,
EncodingType::NullableI64 => EncodingType::I64,
_ => panic!(
"{:?} does not have a corresponding fused nullable type",
Expand All @@ -79,27 +85,64 @@ impl EncodingType {
}

pub fn is_nullable(&self) -> bool {
matches!(
self,
match self {
EncodingType::NullableStr
| EncodingType::NullableI64
| EncodingType::NullableU8
| EncodingType::NullableU16
| EncodingType::NullableU32
| EncodingType::NullableU64
)
| EncodingType::NullableI64
| EncodingType::NullableU8
| EncodingType::NullableU16
| EncodingType::NullableU32
| EncodingType::NullableU64
| EncodingType::NullableF64
| EncodingType::OptStr
| EncodingType::OptF64 => true,
EncodingType::Str
| EncodingType::I64
| EncodingType::U8
| EncodingType::U16
| EncodingType::U32
| EncodingType::U64
| EncodingType::F64
| EncodingType::USize
| EncodingType::Val
| EncodingType::Null
| EncodingType::ScalarI64
| EncodingType::ScalarStr
| EncodingType::ScalarString
| EncodingType::ConstVal
| EncodingType::ByteSlices(_)
| EncodingType::ValRows
| EncodingType::Premerge
| EncodingType::MergeOp => false,
}
}

pub fn non_nullable(&self) -> EncodingType {
match self {
EncodingType::NullableStr => EncodingType::Str,
EncodingType::NullableStr | EncodingType::OptStr => EncodingType::Str,
EncodingType::NullableI64 => EncodingType::I64,
EncodingType::NullableU8 => EncodingType::U8,
EncodingType::NullableU16 => EncodingType::U16,
EncodingType::NullableU32 => EncodingType::U32,
EncodingType::NullableU64 => EncodingType::U64,
EncodingType::OptStr => EncodingType::Str,
_ => *self,
EncodingType::OptF64 | EncodingType::NullableF64 => EncodingType::F64,
EncodingType::Str
| EncodingType::I64
| EncodingType::U8
| EncodingType::U16
| EncodingType::U32
| EncodingType::U64
| EncodingType::F64
| EncodingType::USize
| EncodingType::Val
| EncodingType::Null
| EncodingType::ScalarI64
| EncodingType::ScalarStr
| EncodingType::ScalarString
| EncodingType::ConstVal
| EncodingType::ByteSlices(_)
| EncodingType::ValRows
| EncodingType::Premerge
| EncodingType::MergeOp => *self,
}
}

Expand All @@ -112,6 +155,8 @@ impl EncodingType {
(_, EncodingType::Val) => EncodingType::Val,
(EncodingType::OptStr, EncodingType::Str) => EncodingType::OptStr,
(EncodingType::Str, EncodingType::OptStr) => EncodingType::OptStr,
(EncodingType::OptF64, EncodingType::F64) => EncodingType::OptF64,
(EncodingType::F64, EncodingType::OptF64) => EncodingType::OptF64,
_ => unimplemented!("lub not implemented for {:?} and {:?}", self, other),
}
}
Expand Down Expand Up @@ -201,6 +246,10 @@ impl Type {
Type::new(BasicType::Boolean, None).mutable()
}

pub fn integer() -> Type {
Type::new(BasicType::Integer, None)
}

pub fn is_encoded(&self) -> bool {
self.codec.as_ref().map_or(false, |c| !c.is_identity())
}
Expand Down
13 changes: 13 additions & 0 deletions src/engine/data_types/vec_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,19 @@ impl VecData<OrderedFloat<f64>> for OrderedFloat<f64> {
fn t() -> EncodingType { EncodingType::F64 }
}

impl VecData<Option<OrderedFloat<f64>>> for Option<OrderedFloat<f64>> {
fn unwrap<'a, 'b>(vec: &'b dyn Data<'a>) -> &'b [Option<OrderedFloat<f64>>] where Option<OrderedFloat<f64>>: 'a { vec.cast_ref_opt_f64() }
fn unwrap_mut<'a, 'b>(vec: &'b mut dyn Data<'a>) -> &'b mut Vec<Option<OrderedFloat<f64>>> where Option<OrderedFloat<f64>>: 'a { vec.cast_ref_mut_opt_f64() }
fn wrap_one(value: Option<OrderedFloat<f64>>) -> RawVal {
match value {
Some(f) => RawVal::Float(f),
None => RawVal::Null,
}
}

fn t() -> EncodingType { EncodingType::OptF64 }
}

impl VecData<usize> for usize {
fn unwrap<'a, 'b>(vec: &'b dyn Data<'a>) -> &'b [usize] where usize: 'a { vec.cast_ref_usize() }
fn unwrap_mut<'a, 'b>(vec: &'b mut dyn Data<'a>) -> &'b mut Vec<usize> where usize: 'a { vec.cast_ref_mut_usize() }
Expand Down
12 changes: 12 additions & 0 deletions src/engine/execution/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ impl BufferRef<Any> {
pub fn string(self) -> BufferRef<String> { self.transmute() }
pub fn str<'a>(self) -> BufferRef<&'a str> { self.transmute() }
pub fn opt_str<'a>(self) -> BufferRef<Option<&'a str>> { self.transmute() }
pub fn opt_f64(self) -> BufferRef<Option<OrderedFloat<f64>>> { self.transmute() }
pub fn usize(self) -> BufferRef<usize> { self.transmute() }
fn transmute<T>(self) -> BufferRef<T> { unsafe { mem::transmute(self) } }
}
Expand Down Expand Up @@ -172,6 +173,12 @@ impl From<BufferRef<Premerge>> for TypedBufferRef {
}
}

impl From<BufferRef<Nullable<u8>>> for TypedBufferRef {
fn from(buffer: BufferRef<Nullable<u8>>) -> TypedBufferRef {
TypedBufferRef::new(buffer.any(), EncodingType::NullableU8)
}
}

impl<T> BufferRef<Nullable<T>> {
pub fn cast_non_nullable(self) -> BufferRef<T> { unsafe { mem::transmute(self) } }
pub fn nullable_any(self) -> BufferRef<Nullable<Any>> { unsafe { mem::transmute(self) } }
Expand Down Expand Up @@ -232,6 +239,11 @@ impl TypedBufferRef {
Ok(self.buffer.opt_str())
}

pub fn opt_f64(&self) -> Result<BufferRef<Option<OrderedFloat<f64>>>, QueryError> {
ensure!(self.tag == EncodingType::OptF64, "{:?} != OptF64", self.tag);
Ok(self.buffer.opt_f64())
}

pub fn i64(&self) -> Result<BufferRef<i64>, QueryError> {
ensure!(self.tag == EncodingType::I64, "{:?} != I64", self.tag);
Ok(self.buffer.i64())
Expand Down
4 changes: 3 additions & 1 deletion src/engine/execution/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,9 +525,11 @@ impl<'a> QueryExecutor<'a> {
while has_more {
has_more = false;
for &(op, streamable) in &self.stages[stage].ops {
self.ops[op].execute(stream && streamable, scratchpad)?;
if show && iters == 0 {
println!("{}", self.ops[op].display(true));
}
self.ops[op].execute(stream && streamable, scratchpad)?;
if show && iters == 0 {
for output in self.ops[op].outputs() {
let data = scratchpad.get_any(output);
println!("{}", data.display());
Expand Down
4 changes: 3 additions & 1 deletion src/engine/execution/query_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ impl QueryTask {
let limit = lo.limit as usize;
let offset = lo.offset as usize;
let count = cmp::min(limit, full_result.len() - offset);
full_result.validate().unwrap();

let mut rows = None;
if self.rowformat {
Expand Down Expand Up @@ -450,7 +451,8 @@ impl BasicTypeColumn {
| EncodingType::NullableU32
| EncodingType::NullableU64
| EncodingType::NullableF64
| EncodingType::OptStr => {
| EncodingType::OptStr
| EncodingType::OptF64 => {
let mut vals = vec![];
for i in 0..data.len() {
vals.push(data.get_raw(i));
Expand Down
4 changes: 4 additions & 0 deletions src/engine/execution/scratchpad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,10 @@ impl<'a> Scratchpad<'a> {
self.null_maps[alias.i] = self.null_maps[original.i];
}

pub fn is_alias<T>(&self, i: BufferRef<T>, j: BufferRef<T>) -> bool {
self.resolve(&i) == self.resolve(&j)
}

pub fn assemble_nullable<T>(
&mut self,
original: BufferRef<T>,
Expand Down
Loading

0 comments on commit 0d0e1ce

Please sign in to comment.