From 568cf068f9540b1a5f2c9a5684d1baff41c77f22 Mon Sep 17 00:00:00 2001 From: adz Date: Sat, 15 Jun 2024 00:02:47 +0200 Subject: [PATCH] Pick up un-indexed operations when starting materializer service, add a test --- aquadoggo/src/materializer/service.rs | 172 ++++++++++++++++++++------ 1 file changed, 135 insertions(+), 37 deletions(-) diff --git a/aquadoggo/src/materializer/service.rs b/aquadoggo/src/materializer/service.rs index 9c9a3aca0..ff8873834 100644 --- a/aquadoggo/src/materializer/service.rs +++ b/aquadoggo/src/materializer/service.rs @@ -87,47 +87,63 @@ pub async fn materializer_service( factory.queue(task.to_owned()); }); - // Subscribe to communication bus - let mut rx = tx.subscribe(); - - // Listen to incoming new entries and operations and move them into task queue - let handle = task::spawn(async move { - loop { - if let Ok(ServiceMessage::NewOperation(operation_id)) = rx.recv().await { - // Resolve document id of regarding operation - let document_id = context - .store - .get_document_id_by_operation_id(&operation_id) - .await - .unwrap_or_else(|_| { - panic!( + let handle = { + let context = context.clone(); + + // Subscribe to communication bus + let mut rx = tx.subscribe(); + + // Listen to incoming new entries and operations and move them into task queue + task::spawn(async move { + loop { + if let Ok(ServiceMessage::NewOperation(operation_id)) = rx.recv().await { + // Resolve document id of regarding operation + let document_id = context + .store + .get_document_id_by_operation_id(&operation_id) + .await + .unwrap_or_else(|_| { + panic!( "Failed database query when retrieving document id by operation_id {}", operation_id ) - }); - - match document_id { - Some(document_id) => { - // Dispatch "reduce" task which will materialize the regarding document. - factory.queue(Task::new("reduce", TaskInput::DocumentId(document_id))) - } - None => { - // Panic when we couldn't find the regarding document in the database. We can - // safely assure that this is due to a critical bug affecting the database - // integrity. Panicking here will close `handle` and by that signal a node - // shutdown. - panic!("Could not find document for operation_id {}", operation_id); - } - }; + }); + + match document_id { + Some(document_id) => { + // Dispatch "reduce" task which will materialize the regarding document. + factory.queue(Task::new("reduce", TaskInput::DocumentId(document_id))) + } + None => { + // Panic when we couldn't find the regarding document in the database. We can + // safely assure that this is due to a critical bug affecting the database + // integrity. Panicking here will close `handle` and by that signal a node + // shutdown. + panic!("Could not find document for operation_id {}", operation_id); + } + }; + } } - } - }); + }) + }; debug!("Materialiser service is ready"); if tx_ready.send(()).is_err() { warn!("No subscriber informed about materialiser service being ready"); }; + // Re-apply unmaterialized operations as they might have slipped through in an unexpected crash + // or node shutdown + let unindexed_operation_ids = context + .store + .get_unindexed_operation_ids() + .await + .unwrap_or_else(|_| panic!("Failed database query when loading unindexed operation ids")); + + for id in unindexed_operation_ids { + let _ = tx.send(ServiceMessage::NewOperation(id)); + } + // Wait until we received the application shutdown signal or handle closed tokio::select! { _ = handle => (), @@ -144,6 +160,7 @@ mod tests { use std::time::Duration; use p2panda_rs::document::traits::AsDocument; + use p2panda_rs::document::DocumentId; use p2panda_rs::entry::traits::AsEncodedEntry; use p2panda_rs::identity::KeyPair; use p2panda_rs::operation::{Operation, OperationId, OperationValue}; @@ -175,7 +192,8 @@ mod tests { ) { // Prepare database which inserts data for one document test_runner(move |node: TestNode| async move { - // Populate the store with some entries and operations but DON'T materialise any resulting documents. + // Populate the store with some entries and operations but DON'T materialise any + // resulting documents let documents = populate_store(&node.context.store, &config).await; let document_id = documents[0].id(); @@ -252,12 +270,13 @@ mod tests { config: PopulateStoreConfig, ) { test_runner(move |node: TestNode| async move { - // Populate the store with some entries and operations but DON'T materialise any resulting documents. + // Populate the store with some entries and operations but DON'T materialise any + // resulting documents let documents = populate_store(&node.context.store, &config).await; let document_id = documents[0].id(); - // Store a pending "reduce" task from last runtime in the database so it gets picked up by - // the materializer service + // Store a pending "reduce" task from last runtime in the database so it gets picked up + // by the materializer service node.context .store .insert_task(&Task::new( @@ -295,8 +314,8 @@ mod tests { panic!("Service dropped"); } - // Wait for service to be done .. it should materialize the document since it was waiting - // as a "pending" task in the database + // Wait for service to be done .. it should materialize the document since it was + // waiting as a "pending" task in the database tokio::time::sleep(Duration::from_millis(200)).await; // Make sure the service did not crash and is still running @@ -318,6 +337,85 @@ mod tests { }); } + #[rstest] + fn materialize_unhandled_operations( + #[from(operation)] + #[with(Some(operation_fields(doggo_fields())), None, doggo_schema().id().to_owned())] + operation: Operation, + key_pair: KeyPair, + ) { + test_runner(move |node: TestNode| async move { + // Prepare arguments for service + let context = Context::new( + node.context.store.clone(), + KeyPair::new(), + Configuration::default(), + SchemaProvider::default(), + ); + + // Create an operation in the database which was not handled by the `reduce` task yet. + // This might happen for example when the node crashed right _after_ the operation + // arrived in the database but _before_ the `reduce` task kicked in. + let (entry_signed, _) = + send_to_store(&node.context.store, &operation, &doggo_schema(), &key_pair) + .await + .expect("Publish CREATE operation"); + let document_id: DocumentId = entry_signed.hash().into(); + + // There should be one unhandled operation in the database + let unindexed_operation_ids = context + .store + .get_unindexed_operation_ids() + .await + .unwrap_or_else(|_| { + panic!("Failed database query when loading unindexed operation ids") + }); + assert!(unindexed_operation_ids.len() == 1); + + let shutdown = task::spawn(async { + loop { + // Do this forever .. this means that the shutdown handler will never resolve + tokio::time::sleep(Duration::from_millis(100)).await; + } + }); + let (tx, _) = broadcast::channel(1024); + let (tx_ready, rx_ready) = oneshot::channel::<()>(); + + // Start materializer service, it should pick up the un-indexed operation automatically + let tx_clone = tx.clone(); + let handle = tokio::spawn(async move { + materializer_service(context, shutdown, tx_clone, tx_ready) + .await + .unwrap(); + }); + + if rx_ready.await.is_err() { + panic!("Service dropped"); + } + + // Wait for service to be done .. it should materialize the document since it was + // waiting as a "pending" task in the database + tokio::time::sleep(Duration::from_millis(200)).await; + + // Make sure the service did not crash and is still running + assert!(!handle.is_finished()); + + // Check database for materialized documents + let document = node + .context + .store + .get_document(&document_id) + .await + .unwrap() + .expect("We expect that the document is `Some`"); + assert_eq!(document.id().to_string(), document_id.to_string()); + assert_eq!( + document.get("username").unwrap().to_owned(), + OperationValue::String("bubu".into()) + ); + }); + } + #[rstest] fn materialize_update_document( #[from(populate_store_config)]