Skip to content

Commit

Permalink
chore: remove cardinality statistic gathering for fixed width data bl…
Browse files Browse the repository at this point in the history
…ock (#3076)

This PR tries to remove the cardinality calculation for fixed width data
block, so we can speed up the write speed.
#3069
  • Loading branch information
broccoliSpicy authored Nov 2, 2024
1 parent c9f8a49 commit ceaf49c
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 234 deletions.
266 changes: 39 additions & 227 deletions rust/lance-encoding/src/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{

use arrow_array::{Array, UInt64Array};
use hyperloglogplus::{HyperLogLog, HyperLogLogPlus};
use num_traits::PrimInt;

use crate::data::{
DataBlock, DictionaryDataBlock, FixedWidthDataBlock, OpaqueBlock, StructDataBlock,
Expand Down Expand Up @@ -159,124 +160,62 @@ impl FixedWidthDataBlock {
let data_size_array = Arc::new(UInt64Array::from(vec![data_size]));

// compute this datablock's max_bit_width
let max_bit_width = self.max_bit_width();
let max_bit_widths = self.max_bit_widths();

let cardinality = self.cardinality();
let mut info = self.block_info.0.write().unwrap();
info.insert(Stat::DataSize, data_size_array);
info.insert(Stat::BitWidth, max_bit_width);
info.insert(Stat::Cardinality, cardinality);
info.insert(Stat::BitWidth, max_bit_widths);
}

fn max_bit_width(&mut self) -> Arc<dyn Array> {
fn max_bit_widths(&mut self) -> Arc<dyn Array> {
assert!(self.num_values > 0);

let chunk_size = 1024;
let mut max_bit_widths = vec![];
match self.bits_per_value {
8 => {
let u8_slice_ref = self.data.borrow_to_typed_slice::<u8>();
let u8_slice = u8_slice_ref.as_ref();
for chunk in u8_slice.chunks(chunk_size) {
let max_value = chunk.iter().fold(0, |acc, &x| acc | x);
let max_bit_width = self.bits_per_value - max_value.leading_zeros() as u64;
max_bit_widths.push(max_bit_width);
}
Arc::new(UInt64Array::from(max_bit_widths))
}
16 => {
let u16_slice_ref = self.data.borrow_to_typed_slice::<u16>();
let u16_slice = u16_slice_ref.as_ref();
for chunk in u16_slice.chunks(chunk_size) {
let max_value = chunk.iter().fold(0, |acc, &x| acc | x);
let max_bit_width = self.bits_per_value - max_value.leading_zeros() as u64;
max_bit_widths.push(max_bit_width);
}
Arc::new(UInt64Array::from(max_bit_widths))
}
32 => {
let u32_slice_ref = self.data.borrow_to_typed_slice::<u32>();
let u32_slice = u32_slice_ref.as_ref();
for chunk in u32_slice.chunks(chunk_size) {
let max_value = chunk.iter().fold(0, |acc, &x| acc | x);
let max_bit_width = self.bits_per_value - max_value.leading_zeros() as u64;
max_bit_widths.push(max_bit_width);
}
Arc::new(UInt64Array::from(max_bit_widths))
}
64 => {
let u64_slice_ref = self.data.borrow_to_typed_slice::<u64>();
let u64_slice = u64_slice_ref.as_ref();
for chunk in u64_slice.chunks(chunk_size) {
let max_value = chunk.iter().fold(0, |acc, &x| acc | x);
let max_bit_width = self.bits_per_value - max_value.leading_zeros() as u64;
max_bit_widths.push(max_bit_width);
}
Arc::new(UInt64Array::from(max_bit_widths))
}
// when self.bits_per_value is not (8, 16, 32, 64), it is already bit-packed or we don't
// bit-pack them(Decimal128, Decimal256), so we return `self.bit_per_value` as their `max_bit_width`
_ => Arc::new(UInt64Array::from(vec![self.bits_per_value])),
const CHUNK_SIZE: usize = 1024;

fn calculate_max_bit_width<T: PrimInt>(slice: &[T], bits_per_value: u64) -> Vec<u64> {
slice
.chunks(CHUNK_SIZE)
.map(|chunk| {
let max_value = chunk.iter().fold(T::zero(), |acc, &x| acc | x);
bits_per_value - max_value.leading_zeros() as u64
})
.collect()
}
}

fn cardinality(&mut self) -> Arc<dyn Array> {
match self.bits_per_value {
8 => {
let u8_slice_ref = self.data.borrow_to_typed_slice::<u8>();
let u8_slice = u8_slice_ref.as_ref();

const PRECISION: u8 = 12;
let mut hll: HyperLogLogPlus<u8, RandomState> =
HyperLogLogPlus::new(PRECISION, RandomState::new()).unwrap();
for val in u8_slice {
hll.insert(val);
}
let cardinality = hll.count() as u64;
Arc::new(UInt64Array::from(vec![cardinality]))
let u8_slice = self.data.borrow_to_typed_slice::<u8>();
let u8_slice = u8_slice.as_ref();
Arc::new(UInt64Array::from(calculate_max_bit_width(
u8_slice,
self.bits_per_value,
)))
}
16 => {
let u16_slice_ref = self.data.borrow_to_typed_slice::<u16>();
let u16_slice = u16_slice_ref.as_ref();
const PRECISION: u8 = 12;
let mut hll: HyperLogLogPlus<u16, RandomState> =
HyperLogLogPlus::new(PRECISION, RandomState::new()).unwrap();
for val in u16_slice {
hll.insert(val);
}
let cardinality = hll.count() as u64;
Arc::new(UInt64Array::from(vec![cardinality]))
let u16_slice = self.data.borrow_to_typed_slice::<u16>();
let u16_slice = u16_slice.as_ref();
Arc::new(UInt64Array::from(calculate_max_bit_width(
u16_slice,
self.bits_per_value,
)))
}
32 => {
let u32_slice_ref = self.data.borrow_to_typed_slice::<u32>();
let u32_slice = u32_slice_ref.as_ref();

const PRECISION: u8 = 12;
let mut hll: HyperLogLogPlus<u32, RandomState> =
HyperLogLogPlus::new(PRECISION, RandomState::new()).unwrap();
for val in u32_slice {
hll.insert(val);
}
let cardinality = hll.count() as u64;
Arc::new(UInt64Array::from(vec![cardinality]))
let u32_slice = self.data.borrow_to_typed_slice::<u32>();
let u32_slice = u32_slice.as_ref();
Arc::new(UInt64Array::from(calculate_max_bit_width(
u32_slice,
self.bits_per_value,
)))
}
64 => {
let u64_slice_ref = self.data.borrow_to_typed_slice::<u64>();
let u64_slice = u64_slice_ref.as_ref();

const PRECISION: u8 = 12;
let mut hll: HyperLogLogPlus<u64, RandomState> =
HyperLogLogPlus::new(PRECISION, RandomState::new()).unwrap();
for val in u64_slice {
hll.insert(val);
}
let cardinality = hll.count() as u64;
Arc::new(UInt64Array::from(vec![cardinality]))
let u64_slice = self.data.borrow_to_typed_slice::<u64>();
let u64_slice = u64_slice.as_ref();
Arc::new(UInt64Array::from(calculate_max_bit_width(
u64_slice,
self.bits_per_value,
)))
}
// when self.bits_per_value is not (8, 16, 32, 64), it's a `DataBlock` generated from
// `bitpack` and it's cardinality should equal to it's parent `DataBlock`'s cardinaliry
// (Except Decimal128, Decimal256)
_ => Arc::new(UInt64Array::from(vec![self.num_values])),
_ => Arc::new(UInt64Array::from(vec![self.bits_per_value])),
}
}
}
Expand Down Expand Up @@ -1078,133 +1017,6 @@ mod tests {
);
}

#[test]
fn test_cardinality_fixed_width_datablock() {
let int8_array = Int8Array::from(vec![1, 2, 3]);
let array_ref: ArrayRef = Arc::new(int8_array.clone());
let mut block = DataBlock::from_array(array_ref);
println!(
"block.get_stat(Stat::Cardinality): {:?}",
block.get_stat(Stat::Cardinality)
);

let expected_bit_width = Arc::new(UInt64Array::from(vec![3])) as ArrayRef;
let actual_bit_width = block.get_stat(Stat::Cardinality);

assert_eq!(
actual_bit_width,
Some(expected_bit_width.clone()),
"Expected Stat::Cardinality to be {:?} for data block generated from array: {:?}",
expected_bit_width,
int8_array
);

let int8_array = Int8Array::from(vec![1, 1, 1]);
let array_ref: ArrayRef = Arc::new(int8_array.clone());
let mut block = DataBlock::from_array(array_ref);
println!(
"block.get_stat(Stat::Cardinality): {:?}",
block.get_stat(Stat::Cardinality)
);

let expected_bit_width = Arc::new(UInt64Array::from(vec![1])) as ArrayRef;
let actual_bit_width = block.get_stat(Stat::Cardinality);

assert_eq!(
actual_bit_width,
Some(expected_bit_width.clone()),
"Expected Stat::Cardinality to be {:?} for data block generated from array: {:?}",
expected_bit_width,
int8_array
);

let int8_array = Int8Array::from_iter(0..10);
let array_ref: ArrayRef = Arc::new(int8_array.clone());
let mut block = DataBlock::from_array(array_ref);
println!(
"block.get_stat(Stat::Cardinality): {:?}",
block.get_stat(Stat::Cardinality)
);

let expected_bit_width = Arc::new(UInt64Array::from(vec![10])) as ArrayRef;
let actual_bit_width = block.get_stat(Stat::Cardinality);

assert_eq!(
actual_bit_width,
Some(expected_bit_width.clone()),
"Expected Stat::Cardinality to be {:?} for data block generated from array: {:?}",
expected_bit_width,
int8_array
);

let int8_array = Int8Array::from_iter(-10..10);
let array_ref: ArrayRef = Arc::new(int8_array.clone());
let mut block = DataBlock::from_array(array_ref);
println!(
"block.get_stat(Stat::Cardinality): {:?}",
block.get_stat(Stat::Cardinality)
);

let expected_bit_width = Arc::new(UInt64Array::from(vec![20])) as ArrayRef;
let actual_bit_width = block.get_stat(Stat::Cardinality);

assert_eq!(
actual_bit_width,
Some(expected_bit_width.clone()),
"Expected Stat::Cardinality to be {:?} for data block generated from array: {:?}",
expected_bit_width,
int8_array
);

let int8_array = Int8Array::from_iter(-100..100);
let int8_array2 = Int8Array::from_iter(-10..10);
let array_ref1: ArrayRef = Arc::new(int8_array.clone());
let array_ref2: ArrayRef = Arc::new(int8_array2.clone());
let mut block = DataBlock::from_arrays(
&[array_ref1, array_ref2],
(int8_array.len() + int8_array2.len()) as u64,
);
println!(
"block.get_stat(Stat::Cardinality): {:?}",
block.get_stat(Stat::Cardinality)
);

let expected_bit_width = Arc::new(UInt64Array::from(vec![200])) as ArrayRef;
let actual_bit_width = block.get_stat(Stat::Cardinality);

assert_eq!(
actual_bit_width,
Some(expected_bit_width.clone()),
"Expected Stat::Cardinality to be {:?} for data block generated from array: {:?}",
expected_bit_width,
int8_array
);

let int16_array = Int16Array::from_iter(-100..100);
let int16_array2 = Int16Array::from_iter(-10..10);
let array_ref1: ArrayRef = Arc::new(int16_array.clone());
let array_ref2: ArrayRef = Arc::new(int16_array2);
let mut block = DataBlock::from_arrays(
&[array_ref1, array_ref2],
(int16_array.len() + int8_array2.len()) as u64,
);
println!(
"block.get_stat(Stat::Cardinality): {:?}",
block.get_stat(Stat::Cardinality)
);

let expected_bit_width = Arc::new(UInt64Array::from(vec![200])) as ArrayRef;
let actual_bit_width = block.get_stat(Stat::Cardinality);

assert_eq!(
actual_bit_width,
Some(expected_bit_width.clone()),
"Expected Stat::Cardinality to be {:?} for data block generated from array: {:?}",
expected_bit_width,
int16_array
);
}

#[test]
fn test_cardinality_variable_width_datablock() {
let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
Expand Down
9 changes: 3 additions & 6 deletions rust/lance-index/src/scalar/inverted/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1022,12 +1022,9 @@ pub fn flat_bm25_search(
let doc_iter = iter_str_array(&batch[doc_col]);
let mut scores = Vec::with_capacity(batch.num_rows());
for doc in doc_iter {
let doc = match doc {
Some(doc) => doc,
None => {
scores.push(0.0);
continue;
}
let Some(doc) = doc else {
scores.push(0.0);
continue;
};

let doc_tokens = collect_tokens(doc, tokenizer, Some(query_tokens));
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/dataset/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,7 @@ impl TryFrom<pb::Transaction> for Transaction {
.into_iter()
.map(Fragment::try_from)
.collect::<Result<Vec<_>>>()?,
schema: Schema::from(&Fields(schema.clone())),
schema: Schema::from(&Fields(schema)),
config_upsert_values: config_upsert_option,
})
}
Expand Down

0 comments on commit ceaf49c

Please sign in to comment.