Skip to content

Commit

Permalink
Include tombstoned documents when calculating local log heights
Browse files Browse the repository at this point in the history
  • Loading branch information
sandreae committed Jun 14, 2024
1 parent 4b11f2b commit 1ab4e97
Showing 1 changed file with 60 additions and 15 deletions.
75 changes: 60 additions & 15 deletions aquadoggo/src/replication/strategies/log_height.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use std::collections::HashMap;
use anyhow::Result;
use async_trait::async_trait;
use log::trace;
use p2panda_rs::document::traits::AsDocument;
use p2panda_rs::document::DocumentId;
use p2panda_rs::entry::traits::{AsEncodedEntry, AsEntry};
use p2panda_rs::entry::{LogId, SeqNum};
use p2panda_rs::identity::PublicKey;
use p2panda_rs::schema::{Schema, SchemaId};
use p2panda_rs::storage_provider::traits::{DocumentStore, OperationStore};
use p2panda_rs::storage_provider::traits::OperationStore;
use p2panda_rs::Human;
use sqlx::query_scalar;

use crate::db::types::StorageEntry;
use crate::db::SqlStore;
Expand Down Expand Up @@ -133,21 +133,16 @@ impl LogHeightStrategy {
None => false,
};

// Get the ids of all documents of this schema.
let schema_documents: Vec<DocumentId> = store
.get_documents_by_schema(schema_id)
.await
.unwrap()
.iter()
.map(|document| document.id())
.cloned()
.collect();
// Retrieve ids for all documents in the store which follow a certain schema id. The result will
// include the id for documents which were deleted as we still want to replicate any tombstone
// operations.
let schema_document_ids = get_all_document_ids_for_schema(&store, &schema_id).await;

let mut schema_blob_documents = vec![];

// If the target set included `blob_v1` schema_id then we collect any related blob documents.
if wants_blobs && has_blob_relation {
for document_id in &schema_documents {
for document_id in &schema_document_ids {
let blob_documents = store.get_blob_child_relations(document_id).await.unwrap();
schema_blob_documents.extend(blob_documents)
}
Expand All @@ -172,7 +167,7 @@ impl LogHeightStrategy {
}
}

all_target_documents.extend(schema_documents);
all_target_documents.extend(schema_document_ids);
all_blob_documents.extend(schema_blob_documents);
}

Expand Down Expand Up @@ -303,6 +298,36 @@ impl Strategy for LogHeightStrategy {
}
}

async fn get_all_document_ids_for_schema(
store: &SqlStore,
schema_id: &SchemaId,
) -> Vec<DocumentId> {
query_scalar::<_, String>(
"
SELECT
documents.document_id
FROM
documents
LEFT JOIN operations_v1
ON
operations_v1.operation_id = documents.document_id
WHERE
documents.schema_id = $1
",
)
.bind(schema_id.to_string())
.fetch_all(&store.pool)
.await
.expect("No fatal database error to occur")
.iter()
.map(|id_string| {
id_string
.parse()
.expect("All document id strings stored in the database are valid")
})
.collect()
}

#[cfg(test)]
mod tests {
use p2panda_rs::document::traits::AsDocument;
Expand All @@ -326,8 +351,8 @@ mod tests {
use crate::replication::{LogHeightStrategy, LogHeights, Message, SchemaIdSet};
use crate::test_utils::{
add_blob, add_schema_and_documents, generate_key_pairs, populate_and_materialize,
populate_store_config, test_runner_with_manager, PopulateStoreConfig, TestNode,
TestNodeManager,
populate_store_config, test_runner, test_runner_with_manager, PopulateStoreConfig,
TestNode, TestNodeManager,
};

// Helper for retrieving operations ordered as expected for replication and testing the result.
Expand Down Expand Up @@ -706,4 +731,24 @@ mod tests {
assert!(log_heights.is_empty());
});
}

#[rstest]
fn ids_for_tombstoned_documents_included_in_target_set(
#[from(populate_store_config)]
#[with(5, 2, vec![KeyPair::new()], true)] // logs include tombstone operation
config: PopulateStoreConfig,
) {
test_runner(move |mut node: TestNode| async move {
let target_set = SchemaIdSet::new(&[config.schema.id().to_owned()]);
let documents = populate_and_materialize(&mut node, &config).await;
let document_ids: Vec<DocumentId> =
documents.iter().map(AsDocument::id).cloned().collect();
let strategy =
LogHeightStrategy::new(&target_set, node.context.schema_provider.clone());

let included_documents = strategy.included_document_ids(&node.context.store).await;

assert_eq!(included_documents, document_ids);
});
}
}

0 comments on commit 1ab4e97

Please sign in to comment.