Skip to content

Commit

Permalink
Pick up un-indexed operations when starting materializer service, add…
Browse files Browse the repository at this point in the history
… a test
  • Loading branch information
adzialocha committed Jun 14, 2024
1 parent 41981ea commit 568cf06
Showing 1 changed file with 135 additions and 37 deletions.
172 changes: 135 additions & 37 deletions aquadoggo/src/materializer/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,47 +87,63 @@ pub async fn materializer_service(
factory.queue(task.to_owned());
});

// Subscribe to communication bus
let mut rx = tx.subscribe();

// Listen to incoming new entries and operations and move them into task queue
let handle = task::spawn(async move {
loop {
if let Ok(ServiceMessage::NewOperation(operation_id)) = rx.recv().await {
// Resolve document id of regarding operation
let document_id = context
.store
.get_document_id_by_operation_id(&operation_id)
.await
.unwrap_or_else(|_| {
panic!(
let handle = {
let context = context.clone();

// Subscribe to communication bus
let mut rx = tx.subscribe();

// Listen to incoming new entries and operations and move them into task queue
task::spawn(async move {
loop {
if let Ok(ServiceMessage::NewOperation(operation_id)) = rx.recv().await {
// Resolve document id of regarding operation
let document_id = context
.store
.get_document_id_by_operation_id(&operation_id)
.await
.unwrap_or_else(|_| {
panic!(
"Failed database query when retrieving document id by operation_id {}",
operation_id
)
});

match document_id {
Some(document_id) => {
// Dispatch "reduce" task which will materialize the regarding document.
factory.queue(Task::new("reduce", TaskInput::DocumentId(document_id)))
}
None => {
// Panic when we couldn't find the regarding document in the database. We can
// safely assure that this is due to a critical bug affecting the database
// integrity. Panicking here will close `handle` and by that signal a node
// shutdown.
panic!("Could not find document for operation_id {}", operation_id);
}
};
});

match document_id {
Some(document_id) => {
// Dispatch "reduce" task which will materialize the regarding document.
factory.queue(Task::new("reduce", TaskInput::DocumentId(document_id)))
}
None => {
// Panic when we couldn't find the regarding document in the database. We can
// safely assure that this is due to a critical bug affecting the database
// integrity. Panicking here will close `handle` and by that signal a node
// shutdown.
panic!("Could not find document for operation_id {}", operation_id);
}
};
}
}
}
});
})
};

debug!("Materialiser service is ready");
if tx_ready.send(()).is_err() {
warn!("No subscriber informed about materialiser service being ready");
};

// Re-apply unmaterialized operations as they might have slipped through in an unexpected crash
// or node shutdown
let unindexed_operation_ids = context
.store
.get_unindexed_operation_ids()
.await
.unwrap_or_else(|_| panic!("Failed database query when loading unindexed operation ids"));

for id in unindexed_operation_ids {
let _ = tx.send(ServiceMessage::NewOperation(id));
}

// Wait until we received the application shutdown signal or handle closed
tokio::select! {
_ = handle => (),
Expand All @@ -144,6 +160,7 @@ mod tests {
use std::time::Duration;

use p2panda_rs::document::traits::AsDocument;
use p2panda_rs::document::DocumentId;
use p2panda_rs::entry::traits::AsEncodedEntry;
use p2panda_rs::identity::KeyPair;
use p2panda_rs::operation::{Operation, OperationId, OperationValue};
Expand Down Expand Up @@ -175,7 +192,8 @@ mod tests {
) {
// Prepare database which inserts data for one document
test_runner(move |node: TestNode| async move {
// Populate the store with some entries and operations but DON'T materialise any resulting documents.
// Populate the store with some entries and operations but DON'T materialise any
// resulting documents
let documents = populate_store(&node.context.store, &config).await;
let document_id = documents[0].id();

Expand Down Expand Up @@ -252,12 +270,13 @@ mod tests {
config: PopulateStoreConfig,
) {
test_runner(move |node: TestNode| async move {
// Populate the store with some entries and operations but DON'T materialise any resulting documents.
// Populate the store with some entries and operations but DON'T materialise any
// resulting documents
let documents = populate_store(&node.context.store, &config).await;
let document_id = documents[0].id();

// Store a pending "reduce" task from last runtime in the database so it gets picked up by
// the materializer service
// Store a pending "reduce" task from last runtime in the database so it gets picked up
// by the materializer service
node.context
.store
.insert_task(&Task::new(
Expand Down Expand Up @@ -295,8 +314,8 @@ mod tests {
panic!("Service dropped");
}

// Wait for service to be done .. it should materialize the document since it was waiting
// as a "pending" task in the database
// Wait for service to be done .. it should materialize the document since it was
// waiting as a "pending" task in the database
tokio::time::sleep(Duration::from_millis(200)).await;

// Make sure the service did not crash and is still running
Expand All @@ -318,6 +337,85 @@ mod tests {
});
}

#[rstest]
fn materialize_unhandled_operations(
#[from(operation)]
#[with(Some(operation_fields(doggo_fields())), None, doggo_schema().id().to_owned())]
operation: Operation,
key_pair: KeyPair,
) {
test_runner(move |node: TestNode| async move {
// Prepare arguments for service
let context = Context::new(
node.context.store.clone(),
KeyPair::new(),
Configuration::default(),
SchemaProvider::default(),
);

// Create an operation in the database which was not handled by the `reduce` task yet.
// This might happen for example when the node crashed right _after_ the operation
// arrived in the database but _before_ the `reduce` task kicked in.
let (entry_signed, _) =
send_to_store(&node.context.store, &operation, &doggo_schema(), &key_pair)
.await
.expect("Publish CREATE operation");
let document_id: DocumentId = entry_signed.hash().into();

// There should be one unhandled operation in the database
let unindexed_operation_ids = context
.store
.get_unindexed_operation_ids()
.await
.unwrap_or_else(|_| {
panic!("Failed database query when loading unindexed operation ids")
});
assert!(unindexed_operation_ids.len() == 1);

let shutdown = task::spawn(async {
loop {
// Do this forever .. this means that the shutdown handler will never resolve
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
let (tx, _) = broadcast::channel(1024);
let (tx_ready, rx_ready) = oneshot::channel::<()>();

// Start materializer service, it should pick up the un-indexed operation automatically
let tx_clone = tx.clone();
let handle = tokio::spawn(async move {
materializer_service(context, shutdown, tx_clone, tx_ready)
.await
.unwrap();
});

if rx_ready.await.is_err() {
panic!("Service dropped");
}

// Wait for service to be done .. it should materialize the document since it was
// waiting as a "pending" task in the database
tokio::time::sleep(Duration::from_millis(200)).await;

// Make sure the service did not crash and is still running
assert!(!handle.is_finished());

// Check database for materialized documents
let document = node
.context
.store
.get_document(&document_id)
.await
.unwrap()
.expect("We expect that the document is `Some`");
assert_eq!(document.id().to_string(), document_id.to_string());
assert_eq!(
document.get("username").unwrap().to_owned(),
OperationValue::String("bubu".into())
);
});
}

#[rstest]
fn materialize_update_document(
#[from(populate_store_config)]
Expand Down

0 comments on commit 568cf06

Please sign in to comment.