diff --git a/benches/store_bench.rs b/benches/store_bench.rs index ff49f42..8341fa6 100644 --- a/benches/store_bench.rs +++ b/benches/store_bench.rs @@ -141,8 +141,8 @@ fn bulk_insert(c: &mut Criterion) { let mut group = c.benchmark_group("bulk_operations"); // Test different batch sizes - let batch_sizes = [10, 100, 1000]; - let value_sizes = [10, 100, 1000]; + let batch_sizes = [1000]; + let value_sizes = [1000]; for &batch_size in &batch_sizes { for &value_size in &value_sizes { @@ -243,9 +243,9 @@ fn concurrent_insert(c: &mut Criterion) { // Configuration let item_count = 100_000; - let key_sizes = vec![16, 64, 256, 1024]; // in bytes - let value_sizes = vec![32, 128, 512, 2048]; // in bytes - let thread_counts = vec![1, 2, 4, num_cpus::get()]; + let key_sizes = vec![256]; // in bytes + let value_sizes = vec![512]; // in bytes + let thread_counts = vec![4]; let mut group = c.benchmark_group("concurrent_inserts"); group.throughput(criterion::Throughput::Elements(item_count as u64)); @@ -474,11 +474,11 @@ fn concurrent_workload(c: &mut Criterion) { criterion_group!( benches_sequential, - sequential_insert, - random_insert, + // sequential_insert, + // random_insert, bulk_insert, - sequential_insert_read + // sequential_insert_read ); criterion_group!(benches_range, range_scan); -criterion_group!(benches_concurrent, concurrent_insert, concurrent_workload); +criterion_group!(benches_concurrent, concurrent_insert); criterion_main!(benches_sequential, benches_range, benches_concurrent); diff --git a/src/storage/kv/transaction.rs b/src/storage/kv/transaction.rs index 4c7a6c8..6c4a873 100644 --- a/src/storage/kv/transaction.rs +++ b/src/storage/kv/transaction.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, HashSet}; +use std::collections::HashSet; use std::ops::{Bound, RangeBounds}; use std::sync::Arc; @@ -395,87 +395,147 @@ impl Transaction { self.read_key_ranges.push(rs_entry); } - // Initialize map to store merged results - let mut results_map: BTreeMap, (Vec, u64, u64)> = BTreeMap::new(); - // First, collect results from the write set that fall within the range - // Also keep track of deleted keys - let mut deleted_keys = HashSet::new(); - - // First, collect results from the write set that fall within the range - for (key, entries) in &self.write_set { - // Skip if the key is not in range - if !Self::is_key_in_range(key, &range) { - continue; - } + // Get snapshot iterator + let snap = self.snapshot.as_ref().unwrap(); + let mut snap_iter = snap.range(bound_range.clone()).peekable(); - // Get the latest entry for this key - if let Some(last_entry) = entries.last() { - if !last_entry.e.is_deleted_or_tombstone() { - results_map.insert( - key.to_vec(), - (last_entry.e.value.to_vec(), self.read_ts, last_entry.e.ts), - ); - } else { - deleted_keys.insert(key.clone()); - } - } - } + // Create write set iterator + let mut write_set_iter = WriteSetRangeIterator::new(&self.write_set, range).peekable(); - // Get results from snapshot - let snap = self.snapshot.as_ref().unwrap(); - let ranger = snap.range(bound_range); + // Pre-allocate results vector + let mut results = Vec::with_capacity(limit.unwrap_or(100)); - // Iterate over the keys in the range from the snapshot - 'outer: for (key, value, version, ts) in ranger { - // If a limit is set and we've already got enough results, break the loop + // Merge iterators + loop { if let Some(limit) = limit { - if results_map.len() >= limit { + if results.len() >= limit { break; } } - // Apply all filters. If any filter fails, skip this key - for filter in &FILTERS { - if filter.apply(value).is_err() { - continue 'outer; + match (snap_iter.peek(), write_set_iter.peek()) { + ( + Some(&(ref snap_key, ref snap_value, ref snap_version, ref snap_ts)), + Some((ref write_key, ref write_entry)), + ) => { + // Clean snapshot key (remove null terminator) for comparison + let clean_snap_key = &snap_key[..snap_key.len() - 1]; + + match clean_snap_key.cmp(write_key.as_ref()) { + std::cmp::Ordering::Less => { + // Snapshot key is smaller, process it if version is valid + if **snap_version <= self.read_ts { + // Apply filters + let mut skip = false; + for filter in &FILTERS { + if filter.apply(snap_value).is_err() { + skip = true; + break; + } + } + + if !skip { + // Add to read set for conflict detection + let key_bytes = Bytes::copy_from_slice(clean_snap_key); + self.read_set.push(ReadSetEntry::new( + key_bytes, + **snap_version, + self.savepoints, + )); + + // Add to results + if let Ok(resolved_value) = snap_value.resolve(&self.core) { + results.push(( + clean_snap_key.to_vec(), + resolved_value, + **snap_version, + **snap_ts, + )); + } + } + } + snap_iter.next(); + } + std::cmp::Ordering::Equal => { + // Keys are equal, prefer write set entry unless it's deleted + if !write_entry.e.is_deleted_or_tombstone() { + results.push(( + write_key.to_vec(), + write_entry.e.value.to_vec(), + self.read_ts, + write_entry.e.ts, + )); + } + // Advance both iterators since they're for the same key + snap_iter.next(); + write_set_iter.next(); + } + std::cmp::Ordering::Greater => { + // Write set key is smaller + if !write_entry.e.is_deleted_or_tombstone() { + results.push(( + write_key.to_vec(), + write_entry.e.value.to_vec(), + self.read_ts, + write_entry.e.ts, + )); + } + write_set_iter.next(); + } + } } - } - - // Remove null terminator from key for comparison - let mut clean_key = key.to_vec(); - clean_key.truncate(clean_key.len() - 1); - let clean_key_slice = &clean_key[..]; - - // Skip if this key has been overwritten or deleted in write set - if self.write_set.contains_key(clean_key_slice) - || deleted_keys.contains(clean_key_slice) - { - continue; - } - - // Add to read set for conflict detection - let key_bytes = Bytes::copy_from_slice(&clean_key); - let entry = ReadSetEntry::new(key_bytes, *version, self.savepoints); - self.read_set.push(entry); + (Some(&(ref snap_key, ref snap_value, ref snap_version, ref snap_ts)), None) => { + // Only snapshot entries remaining + if **snap_version <= self.read_ts { + let clean_snap_key = &snap_key[..snap_key.len() - 1]; + + // Apply filters + let mut skip = false; + for filter in &FILTERS { + if filter.apply(snap_value).is_err() { + skip = true; + break; + } + } - if let std::collections::btree_map::Entry::Vacant(e) = results_map.entry(clean_key) { - if let Ok(resolved_value) = value.resolve(&self.core) { - e.insert((resolved_value, *version, *ts)); + if !skip { + // Add to read set for conflict detection + let key_bytes = Bytes::copy_from_slice(clean_snap_key); + self.read_set.push(ReadSetEntry::new( + key_bytes, + **snap_version, + self.savepoints, + )); + + // Add to results + if let Ok(resolved_value) = snap_value.resolve(&self.core) { + results.push(( + clean_snap_key.to_vec(), + resolved_value, + **snap_version, + **snap_ts, + )); + } + } + } + snap_iter.next(); + } + (None, Some((ref write_key, ref write_entry))) => { + // Only write set entries remaining + if !write_entry.e.is_deleted_or_tombstone() { + results.push(( + write_key.to_vec(), + write_entry.e.value.to_vec(), + self.read_ts, + write_entry.e.ts, + )); + } + write_set_iter.next(); } + (None, None) => break, } } - // Convert the map to a vector of results - let mut results: Vec<(Vec, Vec, u64, u64)> = results_map - .into_iter() - .map(|(k, (v, ver, ts))| (k, v, ver, ts)) - .collect(); - - // Apply limit if specified - if let Some(limit) = limit { - results.truncate(limit); - } - Ok(results) } @@ -489,13 +549,13 @@ impl Transaction { Bound::Excluded(start) => key > *start, Bound::Unbounded => true, }; - + let end_exclusive = match range.end_bound() { Bound::Included(end) => key <= *end, Bound::Excluded(end) => key < *end, Bound::Unbounded => true, }; - + start_inclusive && end_exclusive } @@ -601,6 +661,52 @@ impl Transaction { } } +struct WriteSetRangeIterator<'a> { + // Use a sorted vec of references instead of direct hash map iteration + entries: Vec<(&'a Bytes, &'a WriteSetEntry)>, + current: usize, +} + +impl<'a> WriteSetRangeIterator<'a> { + fn new(write_set: &'a HashMap>, range: R) -> Self + where + R: RangeBounds<&'a [u8]>, + { + // Collect and sort all valid entries up front + let mut entries: Vec<_> = write_set + .iter() + .filter_map(|(key, entries)| { + if !Transaction::is_key_in_range(key, &range) { + return None; + } + entries.last().map(|entry| (key, entry)) + }) + .collect(); + + // Sort by key + entries.sort_by(|a, b| a.0.cmp(b.0)); + + Self { + entries, + current: 0, + } + } +} + +impl<'a> Iterator for WriteSetRangeIterator<'a> { + type Item = (&'a Bytes, &'a WriteSetEntry); + + fn next(&mut self) -> Option { + if self.current < self.entries.len() { + let result = self.entries[self.current]; + self.current += 1; + Some(result) + } else { + None + } + } +} + /// Implement savepoints logic for transactions. impl Transaction { /// After calling this method the subsequent modifications within this @@ -3247,11 +3353,11 @@ mod tests { // Verify the results assert_eq!(results.len(), 3); - assert_eq!(results[0].0, key1); + assert_eq!(results[0].0, key1.to_vec()); assert_eq!(results[0].1, value1); - assert_eq!(results[1].0, key2); + assert_eq!(results[1].0, key2.to_vec()); assert_eq!(results[1].1, value2); - assert_eq!(results[2].0, key3); + assert_eq!(results[2].0, key3.to_vec()); assert_eq!(results[2].1, value3); }