Skip to content

Commit

Permalink
fix: better compact errors
Browse files Browse the repository at this point in the history
  • Loading branch information
tbrezot committed Nov 9, 2023
1 parent f22183e commit 182e1b2
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 55 deletions.
6 changes: 3 additions & 3 deletions src/edx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,12 +326,12 @@ pub mod in_memory {
Ok(TokenWithEncryptedValueList::from(
tokens
.into_iter()
.filter_map(|uid| {
.filter_map(|token| {
self.lock()
.expect("couldn't lock the table")
.get(&uid)
.get(&token)
.cloned()
.map(|v| (uid, v))
.map(|v| (token, v))
})
.collect::<Vec<_>>(),
))
Expand Down
2 changes: 1 addition & 1 deletion src/findex_graph/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl<
);
let indexed_values = indexed_values
.into_iter()
.map(|(token, value)| (token, value.iter().map(Into::into).collect()))
.map(|(token, values)| (token, values.iter().map(Into::into).collect()))
.collect();
self.findex_mm
.complete_compacting(rng, key, indexed_values, continuation, label)
Expand Down
81 changes: 53 additions & 28 deletions src/findex_mm/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ impl<
Ok(self.entry_table.dump_tokens().await?.into_iter().collect())
}

/// Fetches all data needed to compact targeted chains among the ones
/// associated with the given tokens. Returns the indexed values in these
/// chains along with data used to finish the compacting operation.
/// Fetches all chains associated to the given tokens.
///
/// # Returns
///
/// All the data needed to compact targeted chains only.
pub async fn prepare_compacting(
&self,
key: &<Self as MmEnc<SEED_LENGTH, UserError>>::Key,
Expand All @@ -52,7 +54,7 @@ impl<
},
)?;

let metadata = entries
let chain_metadata = entries
.iter()
.filter(|(token, _)| compact_target.contains(token))
.map(|(token, entry)| (*token, self.derive_metadata(entry)))
Expand All @@ -61,7 +63,7 @@ impl<
let encrypted_links = self
.chain_table
.get(
metadata
chain_metadata
.iter()
.flat_map(|(_, (_, chain_tokens))| chain_tokens)
.copied()
Expand All @@ -84,26 +86,34 @@ impl<
},
)?;

let mut indexed_values = HashMap::with_capacity(metadata.len());
for (entry_token, (chain_key, chain_tokens)) in &metadata {
let links = chain_tokens
.iter()
.filter_map(|token| {
encrypted_links.get(token).map(|encrypted_link| {
let indexed_chains = chain_metadata
.iter()
.map(|(entry_token, (chain_key, chain_tokens))| {
let links = chain_tokens
.iter()
.filter_map(|token| encrypted_links.get(token))
.map(|encrypted_link| {
self.chain_table
.resolve(chain_key, encrypted_link)
.map(Link)
})
})
.collect::<Result<Vec<_>, _>>()?;

indexed_values.insert(
*entry_token,
self.recompose::<BLOCK_LENGTH, LINE_WIDTH>(&links)?,
);
}
// Collect in a vector to preserve the chain order.
.collect::<Result<Vec<_>, _>>()?;

Ok((indexed_values, CompactingData { metadata, entries }))
Ok((
*entry_token,
self.recompose::<BLOCK_LENGTH, LINE_WIDTH>(&links)?,
))
})
.collect::<Result<_, Error<UserError>>>()?;

Ok((
indexed_chains,
CompactingData {
metadata: chain_metadata,
entries,
},
))
}

/// Completes the compacting operation:
Expand Down Expand Up @@ -193,22 +203,37 @@ impl<
if res.is_err() {
self.chain_table.delete(new_links_tokens).await?;
return Err(Error::Crypto(format!(
"An error occurred during the compact operation. All modifications were reverted. \
"An error occurred during the insert operation. All modifications were reverted. \
({res:?})"
)));
};
let res = self.entry_table.upsert(HashMap::new(), new_entries).await;
if res.as_ref().map(HashMap::is_empty).unwrap_or(false) {

let upsert_results = self.entry_table.upsert(HashMap::new(), new_entries).await;

let res = if let Ok(upsert_results) = upsert_results {
if upsert_results.is_empty() {
Ok(())
} else {
Err(Error::Crypto(format!(
"A conflict occurred during the upsert operation. All modifications were \
reverted. ({upsert_results:?})"
)))
}
} else {
Err(Error::Crypto(
"An error occurred during the upsert operation. All modifications were reverted."
.to_string(),
))
};

if res.is_ok() {
self.chain_table.delete(old_links).await?;
self.entry_table.delete(old_entries).await?;
Ok(())
} else {
self.chain_table.delete(new_links_tokens).await?;
self.entry_table.delete(new_entry_tokens).await?;
Err(Error::Crypto(format!(
"An error occurred during the compact operation. All modifications were reverted. \
({res:?})"
)))
}

res
}
}
1 change: 1 addition & 0 deletions src/findex_mm/mm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ impl<
/// - merges the data from the stacked block and fill the stack;
/// - if this value was an addition, adds it to the set, otherwise removes
/// any matching value from the set.
// TODO (TBZ): take an iterator as input to avoid needless collections.
pub(crate) fn recompose<const BLOCK_LENGTH: usize, const LINE_LENGTH: usize>(
&self,
chain: &[Link],
Expand Down
55 changes: 32 additions & 23 deletions src/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,9 @@ impl<
) -> Result<(), Error<UserError>> {
if (old_key == new_key) && (old_label == new_label) {
return Err(Error::Crypto(
"both the same key and label can be used to compact".to_string(),
"at least one from the new key or the new label should be changed during the \
compact operation"
.to_string(),
));
}

Expand Down Expand Up @@ -348,7 +350,7 @@ impl<
(n_draws / n_compact_to_full as f64) as usize
}

#[tracing::instrument(level = "trace", fields(compact_target = %compact_target, tokens = %tokens, new_label = %new_label),ret, err, skip(self, old_key, new_key, filter_obsolete_data))]
#[tracing::instrument(level = "trace", fields(tokens_to_compact = %tokens_to_compact, tokens_to_fetch = %tokens_to_fetch, new_label = %new_label),ret, err, skip(self, old_key, new_key, filter_obsolete_data))]
async fn compact_batch<
F: Future<Output = Result<HashSet<Location>, String>>,
Filter: Fn(HashSet<Location>) -> F,
Expand All @@ -357,43 +359,50 @@ impl<
old_key: &<FindexGraph<UserError, EntryTable, ChainTable> as GxEnc<UserError>>::Key,
new_key: &<FindexGraph<UserError, EntryTable, ChainTable> as GxEnc<UserError>>::Key,
new_label: &Label,
compact_target: &Tokens,
tokens: Tokens,
tokens_to_compact: &Tokens,
tokens_to_fetch: Tokens,
filter_obsolete_data: &Filter,
) -> Result<(), Error<UserError>> {
let (mut indexed_values, data) = self
let (indexed_values, data) = self
.findex_graph
.prepare_compact::<Keyword, Location>(old_key, tokens.into(), compact_target)
.prepare_compact::<Keyword, Location>(
old_key,
tokens_to_fetch.into(),
tokens_to_compact,
)
.await?;

let locations = indexed_values
let indexed_locations = indexed_values
.values()
.flatten()
.filter_map(IndexedValue::get_data)
.cloned()
.collect();

let remaining_locations = filter_obsolete_data(locations)
let remaining_locations = filter_obsolete_data(indexed_locations)
.await
.map_err(<Self as Index<EntryTable, ChainTable>>::Error::Filter)?;

for values in indexed_values.values_mut() {
let res = values
.iter()
.filter(|v| {
if let Some(location) = v.get_data() {
remaining_locations.contains(location)
} else {
true
}
})
.cloned()
.collect();
*values = res;
}
let remaining_values = indexed_values
.into_iter()
.map(|(entry_token, associated_values)| {
let remaining_values = associated_values
.into_iter()
.filter(|value| {
if let Some(location) = value.get_data() {
remaining_locations.contains(location)
} else {
true
}
})
.collect::<HashSet<_>>();
(entry_token, remaining_values)
})
.filter(|(_, remaining_values)| !remaining_values.is_empty())
.collect::<HashMap<_, _>>();

self.findex_graph
.complete_compacting(self.rng.clone(), new_key, new_label, indexed_values, data)
.complete_compacting(self.rng.clone(), new_key, new_label, remaining_values, data)
.await
}
}
6 changes: 6 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,5 +142,11 @@ mod example {
res,
KeywordToDataMap::from_iter([(kwd1, HashSet::from_iter([loc1]))])
);

let res = index
.compact(&key, &key, &label, &label, 1, &|res| async { Ok(res) })
.await;

assert!(res.is_err());
}
}

0 comments on commit 182e1b2

Please sign in to comment.