diff --git a/src/snapshot.rs b/src/snapshot.rs index 03882b7..4cc645c 100644 --- a/src/snapshot.rs +++ b/src/snapshot.rs @@ -131,11 +131,37 @@ impl Snapshot { .collect() } - pub(crate) fn keys_at_ts<'a, R>(&'a self, range: R, ts: u64) -> Vec<&'a [u8]> + pub(crate) fn keys_at_ts<'a, R>( + &'a self, + range: R, + ts: u64, + limit: Option, + ) -> Vec<&'a [u8]> where R: RangeBounds + 'a, { - self.snap.keys_at_ts(range, ts).collect() + let iter = self.snap.keys_at_ts(range, ts); + match limit { + Some(n) => iter.take(n).collect(), + None => iter.collect(), + } + } + + /// Returns just the keys in the given range. + pub(crate) fn range_keys<'a, R>(&'a self, range: R, limit: Option) -> Vec<&'a [u8]> + where + R: RangeBounds + 'a, + { + let base_iter = self + .snap + .range(range) + .filter(|(_, snap_val, _, _)| !snap_val.deleted()) + .map(|(key, _, _, _)| key); + + match limit { + Some(n) => base_iter.take(n).collect(), + None => base_iter.collect(), + } } } @@ -159,12 +185,20 @@ mod tests { txn.commit().await.expect("Failed to commit transaction"); } - #[tokio::test] - async fn test_versioned_apis() { + // Common setup logic for creating a store + fn create_store() -> (Store, TempDir) { let temp_dir = create_temp_directory(); let mut opts = Options::new(); opts.dir = temp_dir.path().to_path_buf(); - let store = Store::new(opts).expect("Failed to create store"); + ( + Store::new(opts.clone()).expect("should create store"), + temp_dir, + ) + } + + #[tokio::test] + async fn test_versioned_apis() { + let (store, _) = create_store(); // Define multiple keys and their versioned values let keys_values = [ @@ -194,11 +228,18 @@ mod tests { let range = "k1".as_bytes()..="k2".as_bytes(); let keys = txn - .keys_at_ts(range.clone(), ts) + .keys_at_ts(range.clone(), ts, None) .expect("Failed to get keys at timestamp"); assert_eq!(keys[0], b"k1"); assert_eq!(keys[1], b"k2"); + // Check if limit works correctly + let keys = txn + .keys_at_ts(range.clone(), ts, Some(1)) + .expect("Failed to get keys at timestamp"); + assert_eq!(keys.len(), 1); + assert_eq!(keys[0], b"k1"); + // Test scan_at_ts let entries = txn .scan_at_ts(range, ts, Some(10)) diff --git a/src/transaction.rs b/src/transaction.rs index d83a922..3672716 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -820,7 +820,12 @@ impl Transaction { } /// Returns keys within the specified range, at the given timestamp. - pub fn keys_at_ts<'b, R>(&'b self, range: R, ts: u64) -> Result> + pub fn keys_at_ts<'b, R>( + &'b self, + range: R, + ts: u64, + limit: Option, + ) -> Result> where R: RangeBounds<&'b [u8]>, { @@ -828,7 +833,7 @@ impl Transaction { // Convert the range to a tuple of bounds of variable keys. let range = convert_range_bounds(&range); - let keys = self.snapshot.as_ref().unwrap().keys_at_ts(range, ts); + let keys = self.snapshot.as_ref().unwrap().keys_at_ts(range, ts, limit); Ok(keys) } @@ -935,6 +940,20 @@ impl Transaction { pub(crate) fn get_versionstamp(&self) -> Option<(u64, u64)> { self.versionstamp } + + /// Returns only keys within the specified range. + pub fn keys<'b, R>(&'b self, range: R, limit: Option) -> Result> + where + R: RangeBounds<&'b [u8]>, + { + self.ensure_read_only_transaction()?; + + // Convert the range to a tuple of bounds of variable keys. + let range = convert_range_bounds(&range); + let keys = self.snapshot.as_ref().unwrap().range_keys(range, limit); + + Ok(keys) + } } impl Drop for Transaction { @@ -3569,4 +3588,80 @@ mod tests { // Drop the store to simulate closing it store.close().await.unwrap(); } + + #[tokio::test] + async fn keys_with_tombstones_with_limit() { + for is_ssi in [false, true] { + let (store, _) = create_store(is_ssi); + + let key1 = Bytes::from("k1"); + let key2 = Bytes::from("k2"); + let value = Bytes::from("v"); + + // First, insert the keys. + let mut txn1 = store.begin().unwrap(); + txn1.set(&key1, &value).unwrap(); + txn1.set(&key2, &value).unwrap(); + txn1.commit().await.unwrap(); + + // Then, soft-delete them. + let mut txn2 = store.begin().unwrap(); + txn2.soft_delete(&key1).unwrap(); + txn2.soft_delete(&key2).unwrap(); + txn2.commit().await.unwrap(); + + // keys_with_tombstones() should still return `k1` and `k2` + // despite them being soft-deleted. + let range = "k1".as_bytes()..="k2".as_bytes(); + let txn3 = store.begin().unwrap(); + let results = txn3.keys_with_tombstones(range.clone(), None).unwrap(); + assert_eq!(results, vec![Box::from(&b"k1"[..]), Box::from(&b"k2"[..])]); + + // Check if the limit works correctly. + let txn4 = store.begin().unwrap(); + let limited_results = txn4.keys_with_tombstones(range, Some(1)).unwrap(); + assert_eq!(limited_results.len(), 1); + assert_eq!(limited_results, vec![Box::from(&b"k1"[..])]); + } + } + + #[tokio::test] + async fn test_keys_function() { + for is_ssi in [false, true] { + let (store, _) = create_store(is_ssi); + + let key1 = Bytes::from("k1"); + let key2 = Bytes::from("k2"); + let key3 = Bytes::from("k3"); + let value = Bytes::from("v"); + + // Insert the keys. + let mut txn1 = store.begin().unwrap(); + txn1.set(&key1, &value).unwrap(); + txn1.set(&key2, &value).unwrap(); + txn1.set(&key3, &value).unwrap(); + txn1.commit().await.unwrap(); + + // Soft-delete key2. + let mut txn2 = store.begin().unwrap(); + txn2.soft_delete(&key2).unwrap(); + txn2.commit().await.unwrap(); + + // Test the keys function without a limit. + let range = "k1".as_bytes()..="k3".as_bytes(); + let txn3 = store.begin_with_mode(Mode::ReadOnly).unwrap(); + let results = txn3.keys(range.clone(), None).unwrap(); + assert_eq!(results, vec![&b"k1"[..], &b"k3"[..]]); + + // Test the keys function with a limit of 2. + let txn4 = store.begin_with_mode(Mode::ReadOnly).unwrap(); + let limited_results = txn4.keys(range.clone(), Some(2)).unwrap(); + assert_eq!(limited_results, vec![&b"k1"[..], &b"k3"[..]]); + + // Test the keys function with a limit of 1. + let txn5 = store.begin_with_mode(Mode::ReadOnly).unwrap(); + let limited_results = txn5.keys(range, Some(1)).unwrap(); + assert_eq!(limited_results, vec![&b"k1"[..]]); + } + } }