Skip to content

Commit

Permalink
chore: use peek to sort efficiently
Browse files Browse the repository at this point in the history
  • Loading branch information
arriqaaq committed Nov 5, 2024
1 parent a18da8f commit d3ca252
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 82 deletions.
18 changes: 9 additions & 9 deletions benches/store_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ fn bulk_insert(c: &mut Criterion) {
let mut group = c.benchmark_group("bulk_operations");

// Test different batch sizes
let batch_sizes = [10, 100, 1000];
let value_sizes = [10, 100, 1000];
let batch_sizes = [1000];
let value_sizes = [1000];

for &batch_size in &batch_sizes {
for &value_size in &value_sizes {
Expand Down Expand Up @@ -243,9 +243,9 @@ fn concurrent_insert(c: &mut Criterion) {
// Configuration
let item_count = 100_000;

let key_sizes = vec![16, 64, 256, 1024]; // in bytes
let value_sizes = vec![32, 128, 512, 2048]; // in bytes
let thread_counts = vec![1, 2, 4, num_cpus::get()];
let key_sizes = vec![256]; // in bytes
let value_sizes = vec![512]; // in bytes
let thread_counts = vec![4];

let mut group = c.benchmark_group("concurrent_inserts");
group.throughput(criterion::Throughput::Elements(item_count as u64));
Expand Down Expand Up @@ -474,11 +474,11 @@ fn concurrent_workload(c: &mut Criterion) {

criterion_group!(
benches_sequential,
sequential_insert,
random_insert,
// sequential_insert,
// random_insert,
bulk_insert,
sequential_insert_read
// sequential_insert_read
);
criterion_group!(benches_range, range_scan);
criterion_group!(benches_concurrent, concurrent_insert, concurrent_workload);
criterion_group!(benches_concurrent, concurrent_insert);
criterion_main!(benches_sequential, benches_range, benches_concurrent);
252 changes: 179 additions & 73 deletions src/storage/kv/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{BTreeMap, HashSet};
use std::collections::HashSet;
use std::ops::{Bound, RangeBounds};
use std::sync::Arc;

Expand Down Expand Up @@ -395,87 +395,147 @@ impl Transaction {
self.read_key_ranges.push(rs_entry);
}

// Initialize map to store merged results
let mut results_map: BTreeMap<Vec<u8>, (Vec<u8>, u64, u64)> = BTreeMap::new();
// First, collect results from the write set that fall within the range
// Also keep track of deleted keys
let mut deleted_keys = HashSet::new();

// First, collect results from the write set that fall within the range
for (key, entries) in &self.write_set {
// Skip if the key is not in range
if !Self::is_key_in_range(key, &range) {
continue;
}
// Get snapshot iterator
let snap = self.snapshot.as_ref().unwrap();
let mut snap_iter = snap.range(bound_range.clone()).peekable();

// Get the latest entry for this key
if let Some(last_entry) = entries.last() {
if !last_entry.e.is_deleted_or_tombstone() {
results_map.insert(
key.to_vec(),
(last_entry.e.value.to_vec(), self.read_ts, last_entry.e.ts),
);
} else {
deleted_keys.insert(key.clone());
}
}
}
// Create write set iterator
let mut write_set_iter = WriteSetRangeIterator::new(&self.write_set, range).peekable();

// Get results from snapshot
let snap = self.snapshot.as_ref().unwrap();
let ranger = snap.range(bound_range);
// Pre-allocate results vector
let mut results = Vec::with_capacity(limit.unwrap_or(100));

// Iterate over the keys in the range from the snapshot
'outer: for (key, value, version, ts) in ranger {
// If a limit is set and we've already got enough results, break the loop
// Merge iterators
loop {
if let Some(limit) = limit {
if results_map.len() >= limit {
if results.len() >= limit {
break;
}
}

// Apply all filters. If any filter fails, skip this key
for filter in &FILTERS {
if filter.apply(value).is_err() {
continue 'outer;
match (snap_iter.peek(), write_set_iter.peek()) {
(
Some(&(ref snap_key, ref snap_value, ref snap_version, ref snap_ts)),
Some((ref write_key, ref write_entry)),
) => {
// Clean snapshot key (remove null terminator) for comparison
let clean_snap_key = &snap_key[..snap_key.len() - 1];

match clean_snap_key.cmp(write_key.as_ref()) {
std::cmp::Ordering::Less => {
// Snapshot key is smaller, process it if version is valid
if **snap_version <= self.read_ts {
// Apply filters
let mut skip = false;
for filter in &FILTERS {
if filter.apply(snap_value).is_err() {
skip = true;
break;
}
}

if !skip {
// Add to read set for conflict detection
let key_bytes = Bytes::copy_from_slice(clean_snap_key);
self.read_set.push(ReadSetEntry::new(
key_bytes,
**snap_version,
self.savepoints,
));

// Add to results
if let Ok(resolved_value) = snap_value.resolve(&self.core) {
results.push((
clean_snap_key.to_vec(),
resolved_value,
**snap_version,
**snap_ts,
));
}
}
}
snap_iter.next();
}
std::cmp::Ordering::Equal => {
// Keys are equal, prefer write set entry unless it's deleted
if !write_entry.e.is_deleted_or_tombstone() {
results.push((
write_key.to_vec(),
write_entry.e.value.to_vec(),
self.read_ts,
write_entry.e.ts,
));
}
// Advance both iterators since they're for the same key
snap_iter.next();
write_set_iter.next();
}
std::cmp::Ordering::Greater => {
// Write set key is smaller
if !write_entry.e.is_deleted_or_tombstone() {
results.push((
write_key.to_vec(),
write_entry.e.value.to_vec(),
self.read_ts,
write_entry.e.ts,
));
}
write_set_iter.next();
}
}
}
}

// Remove null terminator from key for comparison
let mut clean_key = key.to_vec();
clean_key.truncate(clean_key.len() - 1);
let clean_key_slice = &clean_key[..];

// Skip if this key has been overwritten or deleted in write set
if self.write_set.contains_key(clean_key_slice)
|| deleted_keys.contains(clean_key_slice)
{
continue;
}

// Add to read set for conflict detection
let key_bytes = Bytes::copy_from_slice(&clean_key);
let entry = ReadSetEntry::new(key_bytes, *version, self.savepoints);
self.read_set.push(entry);
(Some(&(ref snap_key, ref snap_value, ref snap_version, ref snap_ts)), None) => {
// Only snapshot entries remaining
if **snap_version <= self.read_ts {
let clean_snap_key = &snap_key[..snap_key.len() - 1];

// Apply filters
let mut skip = false;
for filter in &FILTERS {
if filter.apply(snap_value).is_err() {
skip = true;
break;
}
}

if let std::collections::btree_map::Entry::Vacant(e) = results_map.entry(clean_key) {
if let Ok(resolved_value) = value.resolve(&self.core) {
e.insert((resolved_value, *version, *ts));
if !skip {
// Add to read set for conflict detection
let key_bytes = Bytes::copy_from_slice(clean_snap_key);
self.read_set.push(ReadSetEntry::new(
key_bytes,
**snap_version,
self.savepoints,
));

// Add to results
if let Ok(resolved_value) = snap_value.resolve(&self.core) {
results.push((
clean_snap_key.to_vec(),
resolved_value,
**snap_version,
**snap_ts,
));
}
}
}
snap_iter.next();
}
(None, Some((ref write_key, ref write_entry))) => {
// Only write set entries remaining
if !write_entry.e.is_deleted_or_tombstone() {
results.push((
write_key.to_vec(),
write_entry.e.value.to_vec(),
self.read_ts,
write_entry.e.ts,
));
}
write_set_iter.next();
}
(None, None) => break,
}
}

// Convert the map to a vector of results
let mut results: Vec<(Vec<u8>, Vec<u8>, u64, u64)> = results_map
.into_iter()
.map(|(k, (v, ver, ts))| (k, v, ver, ts))
.collect();

// Apply limit if specified
if let Some(limit) = limit {
results.truncate(limit);
}

Ok(results)
}

Expand All @@ -489,13 +549,13 @@ impl Transaction {
Bound::Excluded(start) => key > *start,
Bound::Unbounded => true,
};

let end_exclusive = match range.end_bound() {
Bound::Included(end) => key <= *end,
Bound::Excluded(end) => key < *end,
Bound::Unbounded => true,
};

start_inclusive && end_exclusive
}

Expand Down Expand Up @@ -601,6 +661,52 @@ impl Transaction {
}
}

struct WriteSetRangeIterator<'a> {
// Use a sorted vec of references instead of direct hash map iteration
entries: Vec<(&'a Bytes, &'a WriteSetEntry)>,
current: usize,
}

impl<'a> WriteSetRangeIterator<'a> {
fn new<R>(write_set: &'a HashMap<Bytes, Vec<WriteSetEntry>>, range: R) -> Self
where
R: RangeBounds<&'a [u8]>,
{
// Collect and sort all valid entries up front
let mut entries: Vec<_> = write_set
.iter()
.filter_map(|(key, entries)| {
if !Transaction::is_key_in_range(key, &range) {
return None;
}
entries.last().map(|entry| (key, entry))
})
.collect();

// Sort by key
entries.sort_by(|a, b| a.0.cmp(b.0));

Self {
entries,
current: 0,
}
}
}

impl<'a> Iterator for WriteSetRangeIterator<'a> {
type Item = (&'a Bytes, &'a WriteSetEntry);

fn next(&mut self) -> Option<Self::Item> {
if self.current < self.entries.len() {
let result = self.entries[self.current];
self.current += 1;
Some(result)
} else {
None
}
}
}

/// Implement savepoints logic for transactions.
impl Transaction {
/// After calling this method the subsequent modifications within this
Expand Down Expand Up @@ -3247,11 +3353,11 @@ mod tests {

// Verify the results
assert_eq!(results.len(), 3);
assert_eq!(results[0].0, key1);
assert_eq!(results[0].0, key1.to_vec());
assert_eq!(results[0].1, value1);
assert_eq!(results[1].0, key2);
assert_eq!(results[1].0, key2.to_vec());
assert_eq!(results[1].1, value2);
assert_eq!(results[2].0, key3);
assert_eq!(results[2].0, key3.to_vec());
assert_eq!(results[2].1, value3);
}

Expand Down

0 comments on commit d3ca252

Please sign in to comment.