Skip to content

Commit

Permalink
modularize encode/decode
Browse files Browse the repository at this point in the history
  • Loading branch information
tbrezot committed Jun 26, 2024
1 parent cb3d475 commit 0e25e2d
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 41 deletions.
82 changes: 53 additions & 29 deletions src/findex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,45 @@ use crate::{
};

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Op {
pub enum Op {
Insert,
Delete,
}

pub struct Findex<'a, Memory: 'a + Stm<Address = Address<ADDRESS_LENGTH>, Word = Vec<u8>>> {
// Lifetime is needed to store a reference of the memory in the vectors.
pub struct Findex<'a, Value, Memory: 'a + Stm<Address = Address<ADDRESS_LENGTH>, Word = Vec<u8>>>
where
Value: TryFrom<Vec<u8>>,
Vec<u8>: From<Value>,
{
el: MemoryEncryptionLayer<Memory>,
vectors: Mutex<HashMap<Address<ADDRESS_LENGTH>, OVec<'a, MemoryEncryptionLayer<Memory>>>>,
encode: Box<fn(Op, HashSet<Value>) -> Vec<Vec<u8>>>,
decode: Box<fn(Vec<Vec<u8>>) -> HashSet<Value>>,
}

impl<'a, Memory: 'a + Stm<Address = Address<ADDRESS_LENGTH>, Word = Vec<u8>>> Findex<'a, Memory> {
impl<'a, Value, Memory: 'a + Stm<Address = Address<ADDRESS_LENGTH>, Word = Vec<u8>>>
Findex<'a, Value, Memory>
where
Value: TryFrom<Vec<u8>>,
Vec<u8>: From<Value>,
{
/// Instantiates Findex with the given seed, and memory.
pub fn new(seed: Secret<KEY_LENGTH>, rng: Arc<Mutex<CsRng>>, stm: Memory) -> Self {
pub fn new(
seed: Secret<KEY_LENGTH>,
rng: Arc<Mutex<CsRng>>,
stm: Memory,
encode: Box<fn(Op, HashSet<Value>) -> Vec<Vec<u8>>>,
decode: Box<fn(Vec<Vec<u8>>) -> HashSet<Value>>,
) -> Self {
// TODO: should the RNG be instantiated here?
// Creating many instances of Findex would need more work but potentially involve less
// waiting for the lock => bench it.
Self {
el: MemoryEncryptionLayer::new(seed, rng, stm),
vectors: Mutex::new(HashMap::new()),
encode,
decode,
}
}

Expand Down Expand Up @@ -64,19 +84,6 @@ impl<'a, Memory: 'a + Stm<Address = Address<ADDRESS_LENGTH>, Word = Vec<u8>>> Fi
a
}

fn decompose<Value>(_op: Op, values: impl Iterator<Item = Value>) -> Vec<Vec<u8>>
where
for<'z> Vec<u8>: From<&'z Value>,
{
values.map(|v| <Vec<u8>>::from(&v)).collect()
}

fn recompose<Value: Hash + PartialEq + Eq + From<Vec<u8>>>(
words: Vec<Vec<u8>>,
) -> HashSet<Value> {
words.into_iter().map(Value::from).collect()
}

/// Pushes the given bindings to the vectors associated to the bound keyword.
///
/// All vector push operations are performed in parallel (via async calls), not batched.
Expand Down Expand Up @@ -135,9 +142,10 @@ impl<
Keyword: Hash + PartialEq + Eq + AsRef<[u8]>,
Value: Hash + PartialEq + Eq + From<Vec<u8>>,
Memory: 'a + Stm<Address = Address<ADDRESS_LENGTH>, Word = Vec<u8>>,
> Index<'a, Keyword, Value> for Findex<'a, Memory>
> Index<'a, Keyword, Value> for Findex<'a, Value, Memory>
where
for<'z> Vec<u8>: From<&'z Value>,
Value: TryFrom<Vec<u8>>,
Vec<u8>: From<Value>,
{
type Error = Error<Address<ADDRESS_LENGTH>, <MemoryEncryptionLayer<Memory> as Stm>::Error>;

Expand All @@ -151,7 +159,7 @@ where
let mut bindings = HashMap::new();
for fut in futures {
let (kw, vals) = fut.await?;
bindings.insert(kw, Self::recompose(vals));
bindings.insert(kw, (self.decode)(vals));
}
Ok(bindings)
}
Expand All @@ -160,27 +168,24 @@ where
&'a self,
bindings: impl Iterator<Item = (Keyword, HashSet<Value>)>,
) -> Result<(), Self::Error> {
self.push(
bindings.map(|(kw, values)| (kw, Self::decompose(Op::Insert, values.into_iter()))),
)
.await
self.push(bindings.map(|(kw, values)| (kw, (self.encode)(Op::Insert, values))))
.await
}

async fn delete(
&'a self,
bindings: impl Iterator<Item = (Keyword, HashSet<Value>)>,
) -> Result<(), Self::Error> {
self.push(
bindings.map(|(kw, values)| (kw, Self::decompose(Op::Delete, values.into_iter()))),
)
.await
self.push(bindings.map(|(kw, values)| (kw, (self.encode)(Op::Delete, values))))
.await
}
}

#[cfg(test)]
mod tests {
use std::{
collections::{HashMap, HashSet},
hash::Hash,
sync::{Arc, Mutex},
};

Expand All @@ -189,12 +194,31 @@ mod tests {

use crate::{address::Address, kv::KvStore, Findex, Index, Value, ADDRESS_LENGTH};

use super::Op;

fn encode<Value>(_op: Op, values: HashSet<Value>) -> Vec<Vec<u8>>
where
for<'z> Vec<u8>: From<&'z Value>,
{
values.into_iter().map(|v| <Vec<u8>>::from(&v)).collect()
}

fn decode<Value: Hash + PartialEq + Eq + From<Vec<u8>>>(words: Vec<Vec<u8>>) -> HashSet<Value> {
words.into_iter().map(Value::from).collect()
}

#[test]
fn test_insert_search() {
let mut rng = CsRng::from_entropy();
let seed = Secret::random(&mut rng);
let kv = KvStore::<Address<ADDRESS_LENGTH>, Vec<u8>>::default();
let findex = Findex::new(seed, Arc::new(Mutex::new(rng)), kv);
let findex = Findex::new(
seed,
Arc::new(Mutex::new(rng)),
kv,
Box::new(encode),
Box::new(decode),
);
let bindings = HashMap::<&str, HashSet<Value>>::from_iter([
(
"cat",
Expand Down
27 changes: 15 additions & 12 deletions src/ovec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,11 @@ impl<
} else {
self.h = cur;
// Findex modifications are only lock-free, hence it does not guarantee a given
// client will ever terminate. It arguably will if the index is not highly
// contended, but we need a stronger guarantee. Maybe a return with an error after
// a reaching a certain number of retries.
// client will ever terminate.
//
// TODO: this loop will arguably terminate if the index is not highly contended,
// but we need a stronger guarantee. Maybe a return with an error after a reaching
// a certain number of retries.
}
}
}
Expand Down Expand Up @@ -149,18 +151,19 @@ impl<
.unwrap_or_default();

// Get all missing values, if any.
let missing_res = self
.m
.batch_read(
(cur_header.start.max(old_header.stop)..cur_header.stop)
.map(|i| self.a.clone() + i + 1)
.collect(),
)
.await?;
let missing_addresses = (cur_header.start.max(old_header.stop)..cur_header.stop)
.map(|i| self.a.clone() + i + 1)
.collect::<Vec<_>>();

let missing_values = if missing_addresses.is_empty() {
vec![] // only call the memory a second time if needed
} else {
self.m.batch_read(missing_addresses).await?
};

res.into_iter()
.skip(1)
.chain(missing_res)
.chain(missing_values)
.enumerate()
.map(|(i, v)| v.ok_or_else(|| Error::MissingValue(self.a.clone() + i as u64)))
.collect()
Expand Down

0 comments on commit 0e25e2d

Please sign in to comment.