Skip to content

Commit

Permalink
chore: add test to check for ssi insert
Browse files Browse the repository at this point in the history
  • Loading branch information
arriqaaq committed Dec 5, 2024
1 parent 8c07470 commit c36efdf
Showing 1 changed file with 106 additions and 7 deletions.
113 changes: 106 additions & 7 deletions src/storage/kv/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ mod tests {
use crate::storage::kv::store::{Store, Task, TaskRunner};
use crate::storage::kv::transaction::Durability;
use crate::storage::log::Error as LogError;
use crate::Error;
use crate::{Error, IsolationLevel};

use async_channel::bounded;
use std::sync::atomic::{AtomicU64, Ordering};
Expand Down Expand Up @@ -1639,10 +1639,13 @@ mod tests {
}

// Common setup logic for creating a store
fn create_store(dir: Option<TempDir>) -> (Store, TempDir) {
fn create_store(dir: Option<TempDir>, is_ssi: bool) -> (Store, TempDir) {
let temp_dir = dir.unwrap_or_else(create_temp_directory);
let mut opts = Options::new();
opts.dir = temp_dir.path().to_path_buf();
if is_ssi {
opts.isolation_level = IsolationLevel::SerializableSnapshotIsolation;
}
(
Store::new(opts.clone()).expect("should create store"),
temp_dir,
Expand All @@ -1651,7 +1654,7 @@ mod tests {

#[tokio::test]
async fn test_store_version_after_reopen() {
let (store, temp_dir) = create_store(None);
let (store, temp_dir) = create_store(None, false);

// Define the number of keys, key size, and value size
let num_keys = 10000u32;
Expand Down Expand Up @@ -1682,7 +1685,7 @@ mod tests {
store.close().await.unwrap();

// Reopen the store
let (store, _) = create_store(Some(temp_dir));
let (store, _) = create_store(Some(temp_dir), false);

// Verify if the indexer version is set correctly
assert_eq!(
Expand All @@ -1703,7 +1706,7 @@ mod tests {

#[tokio::test]
async fn test_incremental_transaction_ids_post_store_open() {
let (store, temp_dir) = create_store(None);
let (store, temp_dir) = create_store(None, false);

let total_records = 1000;
let multiple_keys_records = total_records / 2;
Expand Down Expand Up @@ -1744,7 +1747,7 @@ mod tests {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;

// Reopen the store
let (store, _) = create_store(Some(temp_dir));
let (store, _) = create_store(Some(temp_dir), false);

// Commit a new transaction with a single key
{
Expand Down Expand Up @@ -1900,7 +1903,7 @@ mod tests {
assert!(slow_done_rx.recv().await.is_ok());
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn stop_task_runner_concurrent_tasks() {
// Create a temporary directory for testing
let temp_dir = create_temp_directory();
Expand Down Expand Up @@ -1965,4 +1968,100 @@ mod tests {
total_tasks, final_count
);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_incremental_transaction_ids_concurrent() {
for is_ssi in [false, true] {
let (store, temp_dir) = create_store(None, is_ssi);
let store = Arc::new(store);

let total_records = 1000;
let multiple_keys_records = total_records / 2;

// Define keys and values
let keys: Vec<Bytes> = (1..=total_records)
.map(|i| Bytes::from(format!("key{}", i)))
.collect();
let values: Vec<Bytes> = (1..=total_records)
.map(|i| Bytes::from(format!("value{}", i)))
.collect();

// Insert multiple transactions with single keys concurrently
let mut tasks = Vec::new();
for (i, key) in keys.iter().enumerate().take(multiple_keys_records) {
let store = store.clone();
let key = key.clone();
let value = values[i].clone();
tasks.push(tokio::spawn(async move {
let mut txn = store.begin().unwrap();
txn.set(&key, &value).unwrap();
txn.commit().await.unwrap();
}));
}

// Wait for all tasks to complete
for task in tasks {
task.await.unwrap();
}

// Insert multiple transactions with multiple keys concurrently
let mut tasks = Vec::new();
for (i, key) in keys
.iter()
.enumerate()
.skip(multiple_keys_records)
.take(multiple_keys_records)
{
let store = store.clone();
let key = key.clone();
let value = values[i].clone();
let next_key = keys[(i + multiple_keys_records) % keys.len()].clone();
let next_value = values[(i + multiple_keys_records) % values.len()].clone();
tasks.push(tokio::spawn(async move {
let mut txn = store.begin().unwrap();
txn.set(&key, &value).unwrap();
txn.set(&next_key, &next_value).unwrap();
txn.commit().await.unwrap();
}));
}

// Wait for all tasks to complete
for task in tasks {
task.await.unwrap();
}
// Close the store
store.close().await.unwrap();

// Add delay to ensure that the store is closed
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;

// Reopen the store
let (store, _) = create_store(Some(temp_dir), is_ssi);

// Commit a new transaction with a single key
{
let mut txn = store.begin().unwrap();
txn.set(&keys[0], &values[0]).unwrap();
txn.commit().await.unwrap();

let res = txn.get_versionstamp().unwrap();

assert_eq!(res.0, total_records as u64 + 1);
}

// Commit another new transaction with multiple keys
{
let mut txn = store.begin().unwrap();
txn.set(&keys[1], &values[1]).unwrap();
txn.set(&keys[2], &values[2]).unwrap();
txn.commit().await.unwrap();

let res = txn.get_versionstamp().unwrap();

assert_eq!(res.0, total_records as u64 + 2);
}

store.close().await.unwrap();
}
}
}

0 comments on commit c36efdf

Please sign in to comment.