Skip to content

Commit

Permalink
Make snapshot fully responsible for the visibility of keys
Browse files Browse the repository at this point in the history
  • Loading branch information
gsserge committed Dec 10, 2024
1 parent cd17e78 commit aaf0637
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 130 deletions.
5 changes: 5 additions & 0 deletions src/storage/kv/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ impl IndexValue {
}
}

pub(crate) fn deleted(&self) -> bool {
self.metadata()
.is_some_and(|md| md.is_deleted_or_tombstone())
}

pub(crate) fn segment_id(&self) -> u64 {
match self {
Self::Disk(e) => e.segment_id,
Expand Down
42 changes: 15 additions & 27 deletions src/storage/kv/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,12 @@ impl SnapshotIsolation {
// Check write conflicts
for key in txn.write_set.keys() {
if let Some(last_entry) = txn.write_set.get(key).and_then(|entries| entries.last()) {
match current_snapshot.get(&key[..].into()) {
Ok((_, version)) => {
// Detect if another transaction has written to this key
if version > last_entry.version {
return Err(Error::TransactionWriteConflict);
}
}
Err(Error::KeyNotFound) => {
continue;
}
Err(e) => return Err(e),
// Detect if another transaction has written to this key.
if current_snapshot
.get(&key[..].into())
.is_some_and(|(_, version)| version > last_entry.version)
{
return Err(Error::TransactionWriteConflict);
}
}
}
Expand Down Expand Up @@ -223,35 +218,28 @@ impl SerializableSnapshotIsolation {
// Check read conflicts
for entry in txn.read_set.iter() {
match current_snapshot.get(&entry.key[..].into()) {
Ok((_, version)) => {
Some((_, version)) => {
if entry.ts != version {
return Err(Error::TransactionReadConflict);
}
}
Err(Error::KeyNotFound) => {
None => {
if entry.ts > 0 {
return Err(Error::TransactionReadConflict);
}
continue;
}
Err(e) => return Err(e),
}
}

// Check write conflicts
for key in txn.write_set.keys() {
if let Some(last_entry) = txn.write_set.get(key).and_then(|entries| entries.last()) {
match current_snapshot.get(&key[..].into()) {
Ok((_, version)) => {
// Detect if another transaction has written to this key
if version > last_entry.version {
return Err(Error::TransactionWriteConflict);
}
}
Err(Error::KeyNotFound) => {
continue;
}
Err(e) => return Err(e),
// Detect if another transaction has written to this key.
if current_snapshot
.get(&key[..].into())
.is_some_and(|(_, version)| version > last_entry.version)
{
return Err(Error::TransactionWriteConflict);
}
}
}
Expand Down Expand Up @@ -284,7 +272,7 @@ impl SerializableSnapshotIsolation {
if let Some(entry) = entries.last() {
let key = VariableSizeKey::from_slice(key);
let res = current_snapshot.get(&key);
if entry.e.is_deleted_or_tombstone() && res.is_err() {
if entry.e.is_deleted_or_tombstone() && res.is_none() {
// This is a delete of a key that didn't exist at snapshot time
return Err(Error::TransactionWriteConflict);
}
Expand Down
102 changes: 40 additions & 62 deletions src/storage/kv/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ use crate::storage::{

use vart::{art::QueryType, art::Tree, iter::Iter, VariableSizeKey};

pub(crate) const FILTERS: [fn(&IndexValue) -> Result<()>; 1] = [ignore_deleted];

/// A versioned snapshot for snapshot isolation.
pub(crate) struct Snapshot {
snap: Tree<VariableSizeKey, IndexValue>,
Expand Down Expand Up @@ -48,50 +46,40 @@ impl Snapshot {
.expect("incorrect snapshot version");
}

#[allow(unused)]
pub(crate) fn delete(&mut self, key: &VariableSizeKey) -> bool {
self.snap.remove(key)
}

fn apply_filters(&self, val: IndexValue, filters: &[impl FilterFn]) -> Result<IndexValue> {
for filter in filters {
filter.apply(&val)?;
}

Ok(val)
}

/// Retrieves the latest value associated with the given key from the snapshot.
pub(crate) fn get(&self, key: &VariableSizeKey) -> Result<(IndexValue, u64)> {
let (snap_val, version, _) = self.snap.get(key, self.version).ok_or(Error::KeyNotFound)?;
let val = self.apply_filters(snap_val, &FILTERS)?;
Ok((val, version))
pub(crate) fn get(&self, key: &VariableSizeKey) -> Option<(IndexValue, u64)> {
self.snap
.get(key, self.version)
.filter(|(val, _, _)| !val.deleted())
.map(|(val, version, _)| (val, version))
}

/// Retrieves the value associated with the given key at the given timestamp from the snapshot.
pub(crate) fn get_at_ts(&self, key: &VariableSizeKey, ts: u64) -> Result<IndexValue> {
let (val, _, _) = self.snap.get_at_ts(key, ts).ok_or(Error::KeyNotFound)?;
self.apply_filters(val, &FILTERS)
pub(crate) fn get_at_ts(&self, key: &VariableSizeKey, ts: u64) -> Option<IndexValue> {
self.snap
.get_at_ts(key, ts)
.filter(|(val, _, _)| !val.deleted())
.map(|(val, _, _)| val)
}

/// Retrieves the version history of the value associated with the given key from the snapshot.
pub(crate) fn get_version_history(
&self,
key: &VariableSizeKey,
) -> Result<Vec<(IndexValue, u64)>> {
let mut results = Vec::new();
) -> Option<Vec<(IndexValue, u64)>> {
let items = self.snap.get_version_history(key)?;

let items = self
.snap
.get_version_history(key)
.ok_or(Error::KeyNotFound)?;
let result = items
.into_iter()
.filter(|(val, _, _)| !val.deleted())
.map(|(value, _, ts)| (value, ts))
.collect();

for (value, _, ts) in items {
let result = self.apply_filters(value, &FILTERS)?;
results.push((result, ts));
}

Ok(results)
Some(result)
}

/// Retrieves an iterator over the key-value pairs in the snapshot.
Expand All @@ -100,19 +88,31 @@ impl Snapshot {
self.snap.iter()
}

/// Returns a range query iterator over the Trie.
/// Returns a range query iterator over the Trie without deleted keys.
pub(crate) fn range<'a, R>(
&'a self,
range: R,
) -> impl Iterator<Item = (Vec<u8>, &'a IndexValue, &'a u64, &'a u64)>
where
R: RangeBounds<VariableSizeKey> + 'a,
{
self.snap
.range(range)
.filter(|(_, snap_val, _, _)| !snap_val.deleted())
}

/// Returns a range query iterator over the Trie including deleted keys.
pub(crate) fn range_with_deleted<'a, R>(
&'a self,
range: R,
) -> impl Iterator<Item = (Vec<u8>, &'a IndexValue, &'a u64, &'a u64)>
where
R: RangeBounds<VariableSizeKey> + 'a,
{
self.snap.range(range)
}

/// Returns a versioned range query iterator over the Trie.
#[allow(unused)]
pub(crate) fn range_with_versions<'a, R>(
&'a self,
range: R,
Expand All @@ -127,20 +127,21 @@ impl Snapshot {
&self,
key: &VariableSizeKey,
query_type: QueryType,
) -> Result<(IndexValue, u64, u64)> {
let (idx_val, version, ts) = self
.snap
) -> Option<(IndexValue, u64, u64)> {
self.snap
.get_value_by_query(key, query_type)
.ok_or(Error::KeyNotFound)?;
let filtered_val = self.apply_filters(idx_val, &FILTERS)?;
Ok((filtered_val, version, ts))
.filter(|(val, _, _)| !val.deleted())
}

pub(crate) fn scan_at_ts<R>(&self, range: R, ts: u64) -> Vec<(Vec<u8>, IndexValue)>
where
R: RangeBounds<VariableSizeKey>,
{
self.snap.scan_at_ts(range, ts)
self.snap
.scan_at_ts(range, ts)
.into_iter()
.filter(|(_, snap_val)| !snap_val.deleted())
.collect()
}

pub(crate) fn keys_at_ts<R>(&self, range: R, ts: u64) -> Vec<Vec<u8>>
Expand All @@ -151,29 +152,6 @@ impl Snapshot {
}
}

pub(crate) trait FilterFn {
fn apply(&self, val: &IndexValue) -> Result<()>;
}

fn ignore_deleted(val: &IndexValue) -> Result<()> {
let md = val.metadata();
if let Some(md) = md {
if md.is_deleted_or_tombstone() {
return Err(Error::KeyNotFound);
}
}
Ok(())
}

impl<F> FilterFn for F
where
F: Fn(&IndexValue) -> Result<()>,
{
fn apply(&self, val: &IndexValue) -> Result<()> {
self(val)
}
}

#[cfg(test)]
mod tests {
use crate::storage::kv::option::Options;
Expand Down
Loading

0 comments on commit aaf0637

Please sign in to comment.