Skip to content

Commit

Permalink
feat: add keys api
Browse files Browse the repository at this point in the history
  • Loading branch information
arriqaaq committed Jan 7, 2025
1 parent 4b7ff37 commit b5cea5e
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 8 deletions.
53 changes: 47 additions & 6 deletions src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,37 @@ impl Snapshot {
.collect()
}

pub(crate) fn keys_at_ts<'a, R>(&'a self, range: R, ts: u64) -> Vec<&'a [u8]>
pub(crate) fn keys_at_ts<'a, R>(
&'a self,
range: R,
ts: u64,
limit: Option<usize>,
) -> Vec<&'a [u8]>
where
R: RangeBounds<VariableSizeKey> + 'a,
{
self.snap.keys_at_ts(range, ts).collect()
let iter = self.snap.keys_at_ts(range, ts);
match limit {
Some(n) => iter.take(n).collect(),
None => iter.collect(),
}
}

/// Returns just the keys in the given range.
pub(crate) fn range_keys<'a, R>(&'a self, range: R, limit: Option<usize>) -> Vec<&'a [u8]>
where
R: RangeBounds<VariableSizeKey> + 'a,
{
let base_iter = self
.snap
.range(range)
.filter(|(_, snap_val, _, _)| !snap_val.deleted())
.map(|(key, _, _, _)| key);

match limit {
Some(n) => base_iter.take(n).collect(),
None => base_iter.collect(),
}
}
}

Expand All @@ -159,12 +185,20 @@ mod tests {
txn.commit().await.expect("Failed to commit transaction");
}

#[tokio::test]
async fn test_versioned_apis() {
// Common setup logic for creating a store
fn create_store() -> (Store, TempDir) {
let temp_dir = create_temp_directory();
let mut opts = Options::new();
opts.dir = temp_dir.path().to_path_buf();
let store = Store::new(opts).expect("Failed to create store");
(
Store::new(opts.clone()).expect("should create store"),
temp_dir,
)
}

#[tokio::test]
async fn test_versioned_apis() {
let (store, _) = create_store();

// Define multiple keys and their versioned values
let keys_values = [
Expand Down Expand Up @@ -194,11 +228,18 @@ mod tests {

let range = "k1".as_bytes()..="k2".as_bytes();
let keys = txn
.keys_at_ts(range.clone(), ts)
.keys_at_ts(range.clone(), ts, None)
.expect("Failed to get keys at timestamp");
assert_eq!(keys[0], b"k1");
assert_eq!(keys[1], b"k2");

// Check if limit works correctly
let keys = txn
.keys_at_ts(range.clone(), ts, Some(1))
.expect("Failed to get keys at timestamp");
assert_eq!(keys.len(), 1);
assert_eq!(keys[0], b"k1");

// Test scan_at_ts
let entries = txn
.scan_at_ts(range, ts, Some(10))
Expand Down
99 changes: 97 additions & 2 deletions src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,15 +820,20 @@ impl Transaction {
}

/// Returns keys within the specified range, at the given timestamp.
pub fn keys_at_ts<'b, R>(&'b self, range: R, ts: u64) -> Result<Vec<&'b [u8]>>
pub fn keys_at_ts<'b, R>(
&'b self,
range: R,
ts: u64,
limit: Option<usize>,
) -> Result<Vec<&'b [u8]>>
where
R: RangeBounds<&'b [u8]>,
{
self.ensure_read_only_transaction()?;

// Convert the range to a tuple of bounds of variable keys.
let range = convert_range_bounds(&range);
let keys = self.snapshot.as_ref().unwrap().keys_at_ts(range, ts);
let keys = self.snapshot.as_ref().unwrap().keys_at_ts(range, ts, limit);

Ok(keys)
}
Expand Down Expand Up @@ -935,6 +940,20 @@ impl Transaction {
pub(crate) fn get_versionstamp(&self) -> Option<(u64, u64)> {
self.versionstamp
}

/// Returns only keys within the specified range.
pub fn keys<'b, R>(&'b self, range: R, limit: Option<usize>) -> Result<Vec<&'b [u8]>>
where
R: RangeBounds<&'b [u8]>,
{
self.ensure_read_only_transaction()?;

// Convert the range to a tuple of bounds of variable keys.
let range = convert_range_bounds(&range);
let keys = self.snapshot.as_ref().unwrap().range_keys(range, limit);

Ok(keys)
}
}

impl Drop for Transaction {
Expand Down Expand Up @@ -3569,4 +3588,80 @@ mod tests {
// Drop the store to simulate closing it
store.close().await.unwrap();
}

#[tokio::test]
async fn keys_with_tombstones_with_limit() {
for is_ssi in [false, true] {
let (store, _) = create_store(is_ssi);

let key1 = Bytes::from("k1");
let key2 = Bytes::from("k2");
let value = Bytes::from("v");

// First, insert the keys.
let mut txn1 = store.begin().unwrap();
txn1.set(&key1, &value).unwrap();
txn1.set(&key2, &value).unwrap();
txn1.commit().await.unwrap();

// Then, soft-delete them.
let mut txn2 = store.begin().unwrap();
txn2.soft_delete(&key1).unwrap();
txn2.soft_delete(&key2).unwrap();
txn2.commit().await.unwrap();

// keys_with_tombstones() should still return `k1` and `k2`
// despite them being soft-deleted.
let range = "k1".as_bytes()..="k2".as_bytes();
let txn3 = store.begin().unwrap();
let results = txn3.keys_with_tombstones(range.clone(), None).unwrap();
assert_eq!(results, vec![Box::from(&b"k1"[..]), Box::from(&b"k2"[..])]);

// Check if the limit works correctly.
let txn4 = store.begin().unwrap();
let limited_results = txn4.keys_with_tombstones(range, Some(1)).unwrap();
assert_eq!(limited_results.len(), 1);
assert_eq!(limited_results, vec![Box::from(&b"k1"[..])]);
}
}

#[tokio::test]
async fn test_keys_function() {
for is_ssi in [false, true] {
let (store, _) = create_store(is_ssi);

let key1 = Bytes::from("k1");
let key2 = Bytes::from("k2");
let key3 = Bytes::from("k3");
let value = Bytes::from("v");

// Insert the keys.
let mut txn1 = store.begin().unwrap();
txn1.set(&key1, &value).unwrap();
txn1.set(&key2, &value).unwrap();
txn1.set(&key3, &value).unwrap();
txn1.commit().await.unwrap();

// Soft-delete key2.
let mut txn2 = store.begin().unwrap();
txn2.soft_delete(&key2).unwrap();
txn2.commit().await.unwrap();

// Test the keys function without a limit.
let range = "k1".as_bytes()..="k3".as_bytes();
let txn3 = store.begin_with_mode(Mode::ReadOnly).unwrap();
let results = txn3.keys(range.clone(), None).unwrap();
assert_eq!(results, vec![&b"k1"[..], &b"k3"[..]]);

// Test the keys function with a limit of 2.
let txn4 = store.begin_with_mode(Mode::ReadOnly).unwrap();
let limited_results = txn4.keys(range.clone(), Some(2)).unwrap();
assert_eq!(limited_results, vec![&b"k1"[..], &b"k3"[..]]);

// Test the keys function with a limit of 1.
let txn5 = store.begin_with_mode(Mode::ReadOnly).unwrap();
let limited_results = txn5.keys(range, Some(1)).unwrap();
assert_eq!(limited_results, vec![&b"k1"[..]]);
}
}
}

0 comments on commit b5cea5e

Please sign in to comment.