diff --git a/src/storage/kv/snapshot.rs b/src/storage/kv/snapshot.rs index 9cfec6d..48232f5 100644 --- a/src/storage/kv/snapshot.rs +++ b/src/storage/kv/snapshot.rs @@ -1,14 +1,12 @@ use std::ops::RangeBounds; +use vart::{art::QueryType, art::Tree, iter::Iter, VariableSizeKey}; use crate::storage::{ kv::error::{Error, Result}, kv::indexer::IndexValue, kv::store::Core, - kv::util::now, }; -use vart::{art::QueryType, art::Tree, iter::Iter, VariableSizeKey}; - /// A versioned snapshot for snapshot isolation. pub(crate) struct Snapshot { snap: Tree, @@ -39,17 +37,6 @@ impl Snapshot { }) } - /// Set a key-value pair into the snapshot. - pub(crate) fn set(&mut self, key: &VariableSizeKey, value: IndexValue) { - self.snap - .insert(key, value, self.version, now()) - .expect("incorrect snapshot version"); - } - - pub(crate) fn delete(&mut self, key: &VariableSizeKey) -> bool { - self.snap.remove(key) - } - /// Retrieves the latest value associated with the given key from the snapshot. pub(crate) fn get(&self, key: &VariableSizeKey) -> Option<(IndexValue, u64)> { self.snap @@ -59,11 +46,11 @@ impl Snapshot { } /// Retrieves the value associated with the given key at the given timestamp from the snapshot. - pub(crate) fn get_at_ts(&self, key: &VariableSizeKey, ts: u64) -> Option { + pub(crate) fn get_at_ts(&self, key: &VariableSizeKey, ts: u64) -> Option<(IndexValue, u64)> { self.snap .get_at_ts(key, ts) .filter(|(val, _, _)| !val.deleted()) - .map(|(val, _, _)| val) + .map(|(val, _, ts)| (val, ts)) } /// Retrieves the version history of the value associated with the given key from the snapshot. @@ -201,7 +188,7 @@ mod tests { // Test keys_at_ts let ts = now(); - let txn = store + let mut txn = store .begin_with_mode(Mode::ReadOnly) .expect("Failed to begin transaction"); diff --git a/src/storage/kv/transaction.rs b/src/storage/kv/transaction.rs index 336ff8c..418541e 100644 --- a/src/storage/kv/transaction.rs +++ b/src/storage/kv/transaction.rs @@ -1,19 +1,20 @@ -use std::ops::{Bound, RangeBounds}; -use std::sync::Arc; - -use ahash::{HashMap, HashMapExt}; use ahash::{HashSet, HashSetExt}; use bytes::Bytes; -use std::collections::hash_map::Entry as HashEntry; +use std::cmp::Ordering; +use std::collections::btree_map::Entry as BTreeEntry; +use std::collections::BTreeMap; +use std::ops::{Bound, RangeBounds}; +use std::sync::Arc; use vart::{art::QueryType, VariableSizeKey}; use crate::storage::kv::{ entry::Entry, error::{Error, Result}, indexer::IndexValue, + option::IsolationLevel, snapshot::Snapshot, store::Core, - util::{convert_range_bounds, now}, + util::{convert_range_bounds, convert_range_bounds_bytes, now}, }; /// `Mode` is an enumeration representing the different modes a transaction can have in an MVCC (Multi-Version Concurrency Control) system. @@ -106,7 +107,8 @@ pub(crate) struct ReadSetEntry { } impl ReadSetEntry { - pub(crate) fn new(key: Bytes, ts: u64, savepoint_no: u32) -> Self { + pub(crate) fn new(key: &[u8], ts: u64, savepoint_no: u32) -> Self { + let key = Bytes::copy_from_slice(key); Self { key, ts, @@ -132,6 +134,9 @@ impl ReadScanEntry { } } +type ReadSet = Vec; +type WriteSet = BTreeMap>; + /// `Transaction` is a struct representing a transaction in a database. pub struct Transaction { /// `read_ts` is the read timestamp of the transaction. This is the time at which the transaction started. @@ -151,10 +156,14 @@ pub struct Transaction { /// These are the changes that the transaction intends to make to the data. /// The entries vec is used to keep different values for the same key for /// savepoints and rollbacks. - pub(crate) write_set: HashMap>, + pub(crate) write_set: WriteSet, + + /// Controls whether read keys and ranges should be stored in the read_set and read_key_ranges respectively. + /// Needed only when the transaction is using Serializable Snapshot Isolation. + track_reads: bool, /// `read_set` is the keys that are read in the transaction from the snapshot. This is used for conflict detection. - pub(crate) read_set: Vec, + pub(crate) read_set: ReadSet, /// `read_key_ranges` is the key ranges that are read in the transaction from the snapshot. This is used for conflict detection. pub(crate) read_key_ranges: Vec, @@ -189,12 +198,19 @@ impl Transaction { snapshot = Some(snap); } + // We only need to track reads for SSI. + let track_reads = matches!( + core.opts.isolation_level, + IsolationLevel::SerializableSnapshotIsolation + ); + Ok(Self { read_ts, mode, snapshot, core, - write_set: HashMap::new(), + write_set: BTreeMap::new(), + track_reads, read_set: Vec::new(), read_key_ranges: Vec::new(), durability: Durability::Eventual, @@ -288,24 +304,19 @@ impl Transaction { } // RYOW semantics: Read your own writes. If the value is in the write set, return it. - if let Some(last_entry) = self.write_set.get(key).and_then(|entries| entries.last()) { - let val = if last_entry.e.is_deleted_or_tombstone() { - None - } else { - Some(last_entry.e.value.clone().to_vec()) - }; - return Ok(val); + match self.get_in_write_set(key) { + Some((Some(val), _)) => return Ok(Some(val.to_vec())), + Some((None, _)) => return Ok(None), // Delete in the write set + None => {} } // The value is not in the write set, so attempt to get it from the snapshot. match self.snapshot.as_ref().unwrap().get(&key[..].into()) { Some((val, version)) => { - // If the transaction is not read-only and the value reference has a version greater than 0, - // add the key and its version to the read set for conflict detection. - if !self.mode.is_read_only() && version > 0 { - let key = Bytes::copy_from_slice(key); - let entry = ReadSetEntry::new(key, version, self.savepoints); - self.read_set.push(entry); + // If the transaction is not read-only add the key and its version + // to the read set for conflict detection. + if self.track_reads && !self.mode.is_read_only() { + Self::add_to_read_set(&mut self.read_set, key, version, self.savepoints); } // Resolve the value reference to get the actual value. @@ -314,10 +325,8 @@ impl Transaction { None => { // If the key is not found in the index, and the transaction is not read-only, // add the key to the read set with a timestamp of 0. - if !self.mode.is_read_only() { - let key = Bytes::copy_from_slice(key); - let entry = ReadSetEntry::new(key, 0, self.savepoints); - self.read_set.push(entry); + if self.track_reads && !self.mode.is_read_only() { + Self::add_to_read_set(&mut self.read_set, key, 0, self.savepoints); } Ok(None) } @@ -339,23 +348,12 @@ impl Transaction { return Err(Error::EmptyKey); } - // If the transaction mode is not write-only, update the snapshot. - if !self.mode.is_write_only() { - let index_value = IndexValue::new_mem(e.metadata.clone(), e.value.clone()); - - // Set the key-value pair in the snapshot. - self.snapshot - .as_mut() - .unwrap() - .set(&e.key[..].into(), index_value); - } - // Set the transaction's latest savepoint number and add it to the write set. let key = e.key.clone(); let write_seqno = self.next_write_seqno(); let ws_entry = WriteSetEntry::new(e, self.savepoints, write_seqno, self.read_ts); match self.write_set.entry(key) { - HashEntry::Occupied(mut oe) => { + BTreeEntry::Occupied(mut oe) => { let entries = oe.get_mut(); // If the latest existing value for this key belongs to the same // savepoint as the value we are about to write, then we can @@ -370,7 +368,7 @@ impl Transaction { entries.push(ws_entry) } } - HashEntry::Vacant(ve) => { + BTreeEntry::Vacant(ve) => { ve.insert(vec![ws_entry]); } }; @@ -378,51 +376,140 @@ impl Transaction { Ok(()) } - /// Scans a range of keys and returns a vector of tuples containing the value, version, and timestamp for each key. - pub fn scan<'b, R>(&'b mut self, range: R, limit: Option) -> Result> - where - R: RangeBounds<&'b [u8]>, - { - // Convert the range to a tuple of bounds of variable keys. - let range: (Bound, Bound) = convert_range_bounds(range); + fn add_to_read_set(read_set: &mut ReadSet, key: &[u8], version: u64, savepoints: u32) { + let entry = ReadSetEntry::new(key, version, savepoints); + read_set.push(entry); + } - // Keep track of the range bound predicates for conflict detection in case of SSI. - let rs_entry = ReadScanEntry::new(range.clone(), self.savepoints); - self.read_key_ranges.push(rs_entry); + // Returning `Some(None, ts)` means that the value has been deleted by this transaction. + fn get_in_write_set(&self, key: &[u8]) -> Option<(Option, u64)> { + if let Some(last_entry) = self.write_set.get(key).and_then(|entries| entries.last()) { + if last_entry.e.is_deleted_or_tombstone() { + Some((None, last_entry.e.ts)) + } else { + Some((Some(last_entry.e.value.clone()), last_entry.e.ts)) + } + } else { + None + } + } - // Initialize an empty vector to store the results. + // This is a helper method to perform merging scan in both the write set and the snapshot, + // since the writes are not duplicated in the transaction's snaphot anymore. + fn merging_scan<'b, R, I>( + core: &Core, + write_set: &WriteSet, + mut read_set: Option<&mut ReadSet>, + savepoints: u32, + snap_iter: I, + range: &R, + limit: Option, + ) -> Result> + where + R: RangeBounds, + I: Iterator, &'b IndexValue, u64, u64)>, + { + let mut snap_iter = snap_iter.peekable(); + let range_bytes = convert_range_bounds_bytes(range); + let mut write_set_iter = write_set.range(range_bytes).peekable(); + let limit = limit.unwrap_or(usize::MAX); let mut results = Vec::new(); - // Get a range iterator for the specified range. - let snap = self.snapshot.as_ref().unwrap(); - let ranger = snap.range(range); - - // Iterate over the keys in the range. - for (key, value, version, ts) in ranger { - // If a limit is set and we've already got enough results, break the loop. - if let Some(limit) = limit { + if write_set_iter.peek().is_none() { + // If the write set does not contain values in the scan range, + // do the scan only in the snapshot. This optimisation is quite + // important according to the benches. + for (key, value, version, ts) in snap_iter.by_ref() { if results.len() >= limit { break; } + if let Some(read_set) = read_set.as_mut() { + Self::add_to_read_set(read_set, &key, version, savepoints); + } + let v = value.resolve(core)?; + results.push((key, v, ts)); } + } else { + // If both the write set and the snapshot contain values from the requested + // range, perform a somewhat slower merging scan. + loop { + if results.len() >= limit { + break; + } + + let snap_entry = snap_iter.peek(); + let ws_entry = write_set_iter.peek(); + + // Determine where to pick the next value from, + // the write set or the snaphot. + let from_snap = match (snap_entry, ws_entry) { + (None, None) => break, + (Some(_), None) => true, + (None, Some(_)) => false, + (Some((snap_key, _, _, _)), Some((ws_key, _))) => { + match snap_key.as_slice().cmp(ws_key.as_ref()) { + Ordering::Less => true, + Ordering::Greater => false, + Ordering::Equal => { + snap_iter.next(); + false + } + } + } + }; - // Only add the key to the read set if the version is less than or equal to the - // read timestamp. This is to prevent adding keys that are added during the transaction. - if *version <= self.read_ts { - let key = Bytes::copy_from_slice(&key); - let entry = ReadSetEntry::new(key, *version, self.savepoints); - self.read_set.push(entry); + if from_snap { + let (key, value, version, ts) = snap_iter.next().unwrap(); + if let Some(read_set) = read_set.as_mut() { + Self::add_to_read_set(read_set, &key, version, savepoints); + } + let v = value.resolve(core)?; + results.push((key, v, ts)); + } else { + let (ws_key, ws_entries) = write_set_iter.next().unwrap(); + let ws_entry = ws_entries.last().unwrap(); + if !ws_entry.e.is_deleted_or_tombstone() { + results.push((ws_key.to_vec(), ws_entry.e.value.to_vec(), ws_entry.e.ts)); + } + } } + } - // Resolve the value reference to get the actual value. - let v = value.resolve(&self.core)?; + Ok(results) + } - // Add the key, value, and timestamp to the results vector. - results.push((key, v, *ts)); + /// Scans a range of keys and returns a vector of tuples containing the value, version, and timestamp for each key. + pub fn scan<'b, R>(&'b mut self, range: R, limit: Option) -> Result> + where + R: RangeBounds<&'b [u8]>, + { + // Convert the range to a tuple of bounds of variable keys. + let bound_range = convert_range_bounds(&range); + + // If enabled, keep track of the reads and range bound predicates for conflict detection + // in case of SSI. + let mut read_set_for_scan = None; + if self.track_reads { + read_set_for_scan = Some(&mut self.read_set); + let rs_entry = ReadScanEntry::new(bound_range.clone(), self.savepoints); + self.read_key_ranges.push(rs_entry); } - // Return the results. - Ok(results) + // Get a snapshot iterator for the specified range. + let snap = self.snapshot.as_ref().unwrap(); + let snap_iter = snap + .range(bound_range.clone()) + .map(|(k, v, ver, ts)| (k, v, *ver, *ts)); + + Self::merging_scan( + &self.core, + &self.write_set, + read_set_for_scan, + self.savepoints, + snap_iter, + &bound_range, + limit, + ) } /// Returns all existing keys within the specified range, including soft-deleted @@ -433,7 +520,7 @@ impl Transaction { R: RangeBounds<&'b [u8]>, { // Convert the range to a tuple of bounds of variable keys. - let range = convert_range_bounds(range); + let range = convert_range_bounds(&range); let keys = self.snapshot.as_ref().unwrap().range_with_deleted(range); let result = keys.into_iter().map(|(key, _, _, _)| key).collect(); @@ -574,51 +661,24 @@ impl Transaction { // For every key in the write set, remove entries marked // for rollback since the last call to set_savepoint() - // from its vec and the snapshot. - // HashMap does not really like when items are removed, - // so let's just keep empty `entries` vectors around. + // from its vec. for entries in self.write_set.values_mut() { - let pre_retain_len = entries.len(); - entries.retain(|entry| { - if entry.savepoint_no == self.savepoints { - self.snapshot - .as_mut() - .unwrap() - .delete(&entry.e.key[..].into()); - false - } else { - true - } - }); - - // If the transaction's mode is write-only, or if no values for this - // key were removed due to the rollback, continue to the next key. - if pre_retain_len == entries.len() || self.mode.is_write_only() { - continue; - } - - // Otherwise, the value just before the rollback needs to be added - // back to the snapshot in order to keep scan() working. - if let Some(latest_entry) = entries.last() { - let index_value = IndexValue::new_mem( - latest_entry.e.metadata.clone(), - latest_entry.e.value.clone(), - ); - self.snapshot - .as_mut() - .unwrap() - .set(&latest_entry.e.key[..].into(), index_value); - } + entries.retain(|entry| entry.savepoint_no != self.savepoints); } - // Remove marked entries from the read set to - // prevent unnecessary read-write conflicts. - self.read_set - .retain(|entry| entry.savepoint_no != self.savepoints); + // Remove keys with no entries left after the rollback above. + self.write_set.retain(|_, entries| !entries.is_empty()); + + if self.track_reads { + // Remove marked entries from the read set to + // prevent unnecessary read-write conflicts. + self.read_set + .retain(|entry| entry.savepoint_no != self.savepoints); - // And also from the read scan set. - self.read_key_ranges - .retain(|entry| entry.savepoint_no != self.savepoints); + // And also from the read scan set. + self.read_key_ranges + .retain(|entry| entry.savepoint_no != self.savepoints); + } // Decrement the latest savepoint number unless it's zero. // Cannot undeflow due to the zero check above. @@ -650,19 +710,33 @@ impl Transaction { return Err(Error::EmptyKey); } - // Attempt to get the value for the key from the snapshot. - match self + // Consider the value from the write set only if it's lower than of equal + // to the requested timestamp `ts`. + let ws_val = self.get_in_write_set(key).filter(|(_, ws_ts)| *ws_ts <= ts); + + let snap_val = self .snapshot .as_ref() .unwrap() - .get_at_ts(&key[..].into(), ts) - { - Some(value) => { - // Resolve the value reference to get the actual value. - value.resolve(&self.core).map(Some) + .get_at_ts(&key[..].into(), ts); + + // Similar to `Transaction::merging_scan`, we have to pick where + // the value should come from, the write set or the snapshot. + let result = match (snap_val, ws_val) { + (None, None) => None, + (Some(snap_val), None) => Some(snap_val.0.resolve(&self.core)?), + (None, Some((ws_val, _))) => ws_val.map(|v| v.to_vec()), + (Some(snap_val), Some((ws_val, ws_ts))) => { + assert!(snap_val.1 != ws_ts, "cannot overwrite historical values"); + if ws_ts > snap_val.1 { + ws_val.map(|v| v.to_vec()) + } else { + Some(snap_val.0.resolve(&self.core)?) + } } - None => Ok(None), - } + }; + + Ok(result) } /// Returns all the versioned values and timestamps associated with the key. @@ -700,7 +774,7 @@ impl Transaction { /// Returns key-value pairs within the specified range, at the given timestamp. pub fn scan_at_ts<'b, R>( - &'b self, + &'b mut self, range: R, ts: u64, limit: Option, @@ -709,25 +783,25 @@ impl Transaction { R: RangeBounds<&'b [u8]>, { // Convert the range to a tuple of bounds of variable keys. - let range = convert_range_bounds(range); - let items = self.snapshot.as_ref().unwrap().scan_at_ts(range, ts); - - let mut results = Vec::new(); - for (key, value) in items { - // If a limit is set and we've already got enough results, break the loop. - if let Some(limit) = limit { - if results.len() >= limit { - break; - } - } - - // Resolve the value reference to get the actual value. - let v = value.resolve(&self.core)?; - - results.push((key, v)); - } - - Ok(results) + let range = convert_range_bounds(&range); + let snap_values = self + .snapshot + .as_ref() + .unwrap() + .scan_at_ts(range.clone(), ts); + let snap_iter = snap_values.iter().map(|(k, v)| (k.clone(), v, 0, 0)); + + let results = Self::merging_scan( + &self.core, + &self.write_set, + None, + self.savepoints, + snap_iter, + &range, + limit, + )?; + let results_without_ts = results.into_iter().map(|(k, v, _)| (k, v)).collect(); + Ok(results_without_ts) } /// Returns keys within the specified range, at the given timestamp. @@ -738,7 +812,7 @@ impl Transaction { self.ensure_read_only_transaction()?; // Convert the range to a tuple of bounds of variable keys. - let range = convert_range_bounds(range); + let range = convert_range_bounds(&range); let keys = self.snapshot.as_ref().unwrap().keys_at_ts(range, ts); Ok(keys) @@ -780,7 +854,7 @@ impl Transaction { R: RangeBounds<&'b [u8]>, { // Convert the range to a tuple of bounds of variable keys. - let range = convert_range_bounds(range); + let range = convert_range_bounds(&range); // Initialize an empty vector to store the results. let mut results = Vec::new(); diff --git a/src/storage/kv/util.rs b/src/storage/kv/util.rs index edc97c9..f88b56e 100644 --- a/src/storage/kv/util.rs +++ b/src/storage/kv/util.rs @@ -1,11 +1,9 @@ -use std::{ - fs, - ops::{Bound, RangeBounds}, - path::{Path, PathBuf}, -}; - +use bytes::Bytes; use chrono::Utc; -use vart::VariableSizeKey; +use std::fs; +use std::ops::{Bound, RangeBounds}; +use std::path::{Path, PathBuf}; +use vart::{Key, VariableSizeKey}; use crate::Result; @@ -67,7 +65,7 @@ pub(crate) fn copy_dir_all(src: &Path, dst: &Path) -> Result<()> { } pub(crate) fn convert_range_bounds<'a, R>( - range: R, + range: &R, ) -> (Bound, Bound) where R: RangeBounds<&'a [u8]>, @@ -85,3 +83,20 @@ where }; (start_bound, end_bound) } + +pub(crate) fn convert_range_bounds_bytes(range: &R) -> (Bound, Bound) +where + R: RangeBounds, +{ + let start_bound = match range.start_bound() { + Bound::Included(start) => Bound::Included(Bytes::copy_from_slice(start.as_slice())), + Bound::Excluded(start) => Bound::Excluded(Bytes::copy_from_slice(start.as_slice())), + Bound::Unbounded => Bound::Unbounded, + }; + let end_bound = match range.end_bound() { + Bound::Included(end) => Bound::Included(Bytes::copy_from_slice(end.as_slice())), + Bound::Excluded(end) => Bound::Excluded(Bytes::copy_from_slice(end.as_slice())), + Bound::Unbounded => Bound::Unbounded, + }; + (start_bound, end_bound) +}