Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-apply unhandled operations during startup of materializer service #623

Merged
merged 3 commits into from
Jun 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Re-materialize blobs which were only partially written to disc due to node crash [#618](https://github.com/p2panda/aquadoggo/pull/618)
- Include all logs for target schemas during replication [#620](https://github.com/p2panda/aquadoggo/pull/620)
- Re-apply unhandled operations during startup of materializer service [#623](https://github.com/p2panda/aquadoggo/pull/623)

## [0.7.3]

Expand Down
24 changes: 24 additions & 0 deletions aquadoggo/src/db/stores/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,30 @@ fn group_and_parse_operation_rows(
}

impl SqlStore {
/// Returns ids of operations which have not been processed by `reduce` task yet.
pub async fn get_unindexed_operation_ids(
&self,
) -> Result<Vec<OperationId>, OperationStorageError> {
let id_rows: Vec<String> = query_scalar(
"
SELECT
operations_v1.operation_id
FROM
operations_v1
WHERE
operations_v1.sorted_index IS NULL
",
)
.fetch_all(&self.pool)
.await
.map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?;

Ok(id_rows
.iter()
.map(|id| id.parse().expect("invalid operation id in database"))
.collect())
}

/// Update the sorted index of an operation. This method is used in `reduce` tasks as each
/// operation is processed.
pub async fn update_operation_index(
Expand Down
172 changes: 135 additions & 37 deletions aquadoggo/src/materializer/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,47 +87,63 @@ pub async fn materializer_service(
factory.queue(task.to_owned());
});

// Subscribe to communication bus
let mut rx = tx.subscribe();

// Listen to incoming new entries and operations and move them into task queue
let handle = task::spawn(async move {
loop {
if let Ok(ServiceMessage::NewOperation(operation_id)) = rx.recv().await {
// Resolve document id of regarding operation
let document_id = context
.store
.get_document_id_by_operation_id(&operation_id)
.await
.unwrap_or_else(|_| {
panic!(
let handle = {
let context = context.clone();

// Subscribe to communication bus
let mut rx = tx.subscribe();

// Listen to incoming new entries and operations and move them into task queue
task::spawn(async move {
loop {
if let Ok(ServiceMessage::NewOperation(operation_id)) = rx.recv().await {
// Resolve document id of regarding operation
let document_id = context
.store
.get_document_id_by_operation_id(&operation_id)
.await
.unwrap_or_else(|_| {
panic!(
"Failed database query when retrieving document id by operation_id {}",
operation_id
)
});

match document_id {
Some(document_id) => {
// Dispatch "reduce" task which will materialize the regarding document.
factory.queue(Task::new("reduce", TaskInput::DocumentId(document_id)))
}
None => {
// Panic when we couldn't find the regarding document in the database. We can
// safely assure that this is due to a critical bug affecting the database
// integrity. Panicking here will close `handle` and by that signal a node
// shutdown.
panic!("Could not find document for operation_id {}", operation_id);
}
};
});

match document_id {
Some(document_id) => {
// Dispatch "reduce" task which will materialize the regarding document.
factory.queue(Task::new("reduce", TaskInput::DocumentId(document_id)))
}
None => {
// Panic when we couldn't find the regarding document in the database. We can
// safely assure that this is due to a critical bug affecting the database
// integrity. Panicking here will close `handle` and by that signal a node
// shutdown.
panic!("Could not find document for operation_id {}", operation_id);
}
};
}
}
}
});
})
};

debug!("Materialiser service is ready");
if tx_ready.send(()).is_err() {
warn!("No subscriber informed about materialiser service being ready");
};

// Re-apply unmaterialized operations as they might have slipped through in an unexpected crash
// or node shutdown
let unindexed_operation_ids = context
.store
.get_unindexed_operation_ids()
.await
.unwrap_or_else(|_| panic!("Failed database query when loading unindexed operation ids"));

for id in unindexed_operation_ids {
let _ = tx.send(ServiceMessage::NewOperation(id));
}

// Wait until we received the application shutdown signal or handle closed
tokio::select! {
_ = handle => (),
Expand All @@ -144,6 +160,7 @@ mod tests {
use std::time::Duration;

use p2panda_rs::document::traits::AsDocument;
use p2panda_rs::document::DocumentId;
use p2panda_rs::entry::traits::AsEncodedEntry;
use p2panda_rs::identity::KeyPair;
use p2panda_rs::operation::{Operation, OperationId, OperationValue};
Expand Down Expand Up @@ -175,7 +192,8 @@ mod tests {
) {
// Prepare database which inserts data for one document
test_runner(move |node: TestNode| async move {
// Populate the store with some entries and operations but DON'T materialise any resulting documents.
// Populate the store with some entries and operations but DON'T materialise any
// resulting documents
let documents = populate_store(&node.context.store, &config).await;
let document_id = documents[0].id();

Expand Down Expand Up @@ -252,12 +270,13 @@ mod tests {
config: PopulateStoreConfig,
) {
test_runner(move |node: TestNode| async move {
// Populate the store with some entries and operations but DON'T materialise any resulting documents.
// Populate the store with some entries and operations but DON'T materialise any
// resulting documents
let documents = populate_store(&node.context.store, &config).await;
let document_id = documents[0].id();

// Store a pending "reduce" task from last runtime in the database so it gets picked up by
// the materializer service
// Store a pending "reduce" task from last runtime in the database so it gets picked up
// by the materializer service
node.context
.store
.insert_task(&Task::new(
Expand Down Expand Up @@ -295,8 +314,8 @@ mod tests {
panic!("Service dropped");
}

// Wait for service to be done .. it should materialize the document since it was waiting
// as a "pending" task in the database
// Wait for service to be done .. it should materialize the document since it was
// waiting as a "pending" task in the database
tokio::time::sleep(Duration::from_millis(200)).await;

// Make sure the service did not crash and is still running
Expand All @@ -318,6 +337,85 @@ mod tests {
});
}

#[rstest]
fn materialize_unhandled_operations(
#[from(operation)]
#[with(Some(operation_fields(doggo_fields())), None, doggo_schema().id().to_owned())]
operation: Operation,
key_pair: KeyPair,
) {
test_runner(move |node: TestNode| async move {
// Prepare arguments for service
let context = Context::new(
node.context.store.clone(),
KeyPair::new(),
Configuration::default(),
SchemaProvider::default(),
);

// Create an operation in the database which was not handled by the `reduce` task yet.
// This might happen for example when the node crashed right _after_ the operation
// arrived in the database but _before_ the `reduce` task kicked in.
let (entry_signed, _) =
send_to_store(&node.context.store, &operation, &doggo_schema(), &key_pair)
.await
.expect("Publish CREATE operation");
let document_id: DocumentId = entry_signed.hash().into();

// There should be one unhandled operation in the database
let unindexed_operation_ids = context
.store
.get_unindexed_operation_ids()
.await
.unwrap_or_else(|_| {
panic!("Failed database query when loading unindexed operation ids")
});
assert!(unindexed_operation_ids.len() == 1);

let shutdown = task::spawn(async {
loop {
// Do this forever .. this means that the shutdown handler will never resolve
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
let (tx, _) = broadcast::channel(1024);
let (tx_ready, rx_ready) = oneshot::channel::<()>();

// Start materializer service, it should pick up the un-indexed operation automatically
let tx_clone = tx.clone();
let handle = tokio::spawn(async move {
materializer_service(context, shutdown, tx_clone, tx_ready)
.await
.unwrap();
});

if rx_ready.await.is_err() {
panic!("Service dropped");
}

// Wait for service to be done .. it should materialize the document since it was
// waiting as a "pending" task in the database
tokio::time::sleep(Duration::from_millis(200)).await;

// Make sure the service did not crash and is still running
assert!(!handle.is_finished());

// Check database for materialized documents
let document = node
.context
.store
.get_document(&document_id)
.await
.unwrap()
.expect("We expect that the document is `Some`");
assert_eq!(document.id().to_string(), document_id.to_string());
assert_eq!(
document.get("username").unwrap().to_owned(),
OperationValue::String("bubu".into())
);
});
}

#[rstest]
fn materialize_update_document(
#[from(populate_store_config)]
Expand Down
Loading