Skip to content

Commit

Permalink
feat: add in memory DB tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tbrezot committed Feb 14, 2024
1 parent c7819d2 commit d384384
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 8 deletions.
101 changes: 96 additions & 5 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use crate::{
DbInterfaceErrorTrait,
};

// TODO: add `connect`/`setup` to the DX interface. First check if all existing
// interfaces allow it.
#[async_trait(?Send)]
pub trait DbInterface {
/// Type of error returned by the EDX.
Expand Down Expand Up @@ -47,27 +49,31 @@ pub trait DbInterface {
/// Deletes the lines associated to the given tokens from the EDX.
async fn delete(&self, tokens: TokenSet) -> Result<(), Self::Error>;

/// Returns all data stored through this interface.
async fn dump(&self) -> Result<Edx, Self::Error>;
}

#[cfg(any(test, feature = "in_memory"))]
pub mod in_memory {
pub mod tests {
use std::{
collections::HashMap,
collections::{HashMap, HashSet},
fmt::{Debug, Display},
ops::Deref,
sync::{Arc, Mutex},
thread::spawn,
};

use async_trait::async_trait;
use cosmian_crypto_core::CryptoCoreError;
#[cfg(feature = "in_memory")]
use cosmian_crypto_core::{bytes_ser_de::Serializable, Nonce};
use cosmian_crypto_core::{CryptoCoreError, CsRng};
use futures::executor::block_on;
use rand::{RngCore, SeedableRng};

use super::{DbInterface, Edx, TokenSet};
use crate::error::DbInterfaceErrorTrait;
#[cfg(feature = "in_memory")]
use crate::parameters::{MAC_LENGTH, NONCE_LENGTH};
use crate::{dx_enc::Token, error::DbInterfaceErrorTrait};

#[derive(Debug)]
pub struct InMemoryDbError(String);
Expand All @@ -87,7 +93,7 @@ pub mod in_memory {
impl std::error::Error for InMemoryDbError {}
impl DbInterfaceErrorTrait for InMemoryDbError {}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct InMemoryDb(Arc<Mutex<Edx>>);

impl Deref for InMemoryDb {
Expand Down Expand Up @@ -258,4 +264,89 @@ pub mod in_memory {
Ok(Edx::from(res))
}
}

const N_WORKERS: usize = 100;

#[test]
fn test_insert_then_dump_and_fetch() {
let db = InMemoryDb::default();
let handles = (0..N_WORKERS)
.map(|i| {
let db = db.clone();
spawn(move || -> Result<_, <InMemoryDb as DbInterface>::Error> {
let mut rng = CsRng::from_entropy();
let mut tok = Token::default();
rng.fill_bytes(&mut tok);
let data = vec![i as u8];
let rejected_items =
block_on(db.insert(Edx::from(HashMap::from_iter([(tok, data.clone())]))))?;
if rejected_items.is_empty() {
Ok((tok, data))
} else {
Err(InMemoryDbError("some items were rejected".to_string()))
}
})
})
.collect::<Vec<_>>();

let inserted_dx = handles
.into_iter()
.flat_map(|h| h.join())
.collect::<Result<HashMap<_, _>, _>>()
.unwrap();

let stored_dx = block_on(db.dump()).unwrap();
assert_eq!(inserted_dx, *stored_dx);

let res = block_on(db.fetch(inserted_dx.keys().copied().collect())).unwrap();
assert_eq!(inserted_dx, *res);
}

#[test]
fn test_concurrent_upsert() {
let db = InMemoryDb::default();
let mut rng = CsRng::from_entropy();
let mut tok = Token::default();
rng.fill_bytes(&mut tok);

let handles = (0..N_WORKERS)
.map(|i| {
let db = db.clone();
spawn(move || -> Result<_, <InMemoryDb as DbInterface>::Error> {
let data = vec![i as u8];
let mut rejected_items =
block_on(db.insert(Edx::from(HashMap::from_iter([(tok, data.clone())]))))?;
while !rejected_items.is_empty() {
let mut new_data = rejected_items
.get(&tok)
.ok_or_else(|| {
InMemoryDbError(format!(
"thread {i} did not retrieve token current value"
))
})?
.clone();
new_data.extend(&data);
rejected_items = block_on(db.upsert(
rejected_items,
Edx::from(HashMap::from_iter([(tok, new_data)])),
))?;
}
Ok(())
})
})
.collect::<Vec<_>>();

for h in handles {
h.join().unwrap().unwrap();
}

let dx = block_on(db.dump()).unwrap();
assert_eq!(dx.len(), 1);

let stored_data = dx.get(&tok).unwrap();
let ids = stored_data.iter().copied().collect::<HashSet<_>>();

// Check all threads have successfully inserted their ID.
assert_eq!(ids, (0..N_WORKERS as u8).collect::<HashSet<u8>>());
}
}
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ mod mm_enc;
mod parameters;

#[cfg(any(test, feature = "in_memory"))]
pub use db::in_memory::{InMemoryDb, InMemoryDbError};
pub use db::tests::{InMemoryDb, InMemoryDbError};
pub use db::DbInterface;
pub use dx_enc::{CsRhDxEnc, DynRhDxEnc};
pub use error::{CoreError, DbInterfaceErrorTrait, Error};

// pub use mm_enc::{CsRhMmEnc, Findex};
// pub use index::{
// Data, Findex, Index, IndexedValueToKeywordsMap, Keyword, KeywordToDataMap, Keywords, Label,
Expand Down
9 changes: 7 additions & 2 deletions src/mm_enc/findex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ fn recompose(chain: &[Link]) -> Result<Vec<Vec<u8>>, CoreError> {
// type is a vector, thus the linked list needs to be converted. All in all,
// this method adds two iterations/allocations: there may be a better
// way. Maybe sticking with vectors is the way to go.
//
// TODO: use a BTreeMap
let mut purged_value = LinkedList::new();
let mut remaining_items = HashSet::new();
let mut deleted_items = HashSet::new();
Expand All @@ -135,12 +137,15 @@ fn recompose(chain: &[Link]) -> Result<Vec<Vec<u8>>, CoreError> {
Ok(purged_value.into_iter().collect())
}

/// Findex is a CS-RH-MM-Enc scheme.
///
/// It relies on a generic CS-RH-DX-Enc and a generic Dyn-RH-DX-Enc schemes.
#[derive(Debug)]
pub struct Findex<
const TAG_LENGTH: usize,
Tag: Hash + PartialEq + Eq + From<[u8; TAG_LENGTH]> + Into<[u8; TAG_LENGTH]>,
EntryDxEnc: CsRhDxEnc<TAG_LENGTH, METADATA_LENGTH, Tag>,
ChainDxEnc: DynRhDxEnc<LINK_LENGTH>,
EntryDxEnc: CsRhDxEnc<TAG_LENGTH, METADATA_LENGTH, Tag, Item = Metadata>,
ChainDxEnc: DynRhDxEnc<LINK_LENGTH, Item = Link>,
> {
pub entry: EntryDxEnc,
pub chain: ChainDxEnc,
Expand Down

0 comments on commit d384384

Please sign in to comment.