From cc0db3d6cd032e5309b46ae724b3990c2196a904 Mon Sep 17 00:00:00 2001 From: adz Date: Sun, 16 Jun 2024 02:23:06 +0200 Subject: [PATCH] Gracefully wait until all tasks are done when shutting down materializer service --- aquadoggo/src/materializer/service.rs | 88 +++++++++++++++++---------- 1 file changed, 57 insertions(+), 31 deletions(-) diff --git a/aquadoggo/src/materializer/service.rs b/aquadoggo/src/materializer/service.rs index 9c9a3aca0..e1816e628 100644 --- a/aquadoggo/src/materializer/service.rs +++ b/aquadoggo/src/materializer/service.rs @@ -4,6 +4,7 @@ use anyhow::Result; use log::{debug, warn}; use p2panda_rs::storage_provider::traits::OperationStore; use tokio::task; +use tokio_stream::StreamExt; use crate::bus::{ServiceMessage, ServiceSender}; use crate::context::Context; @@ -13,6 +14,7 @@ use crate::materializer::tasks::{ }; use crate::materializer::worker::{Factory, Task, TaskStatus}; use crate::materializer::TaskInput; +use crate::shutdown::ShutdownHandler; /// Capacity of the internal broadcast channels used inside the worker factory. /// @@ -87,41 +89,65 @@ pub async fn materializer_service( factory.queue(task.to_owned()); }); + let mut shutdown_handler = ShutdownHandler::new(); + let mut shutdown_request_received = shutdown_handler.is_requested(); + // Subscribe to communication bus let mut rx = tx.subscribe(); // Listen to incoming new entries and operations and move them into task queue - let handle = task::spawn(async move { - loop { - if let Ok(ServiceMessage::NewOperation(operation_id)) = rx.recv().await { - // Resolve document id of regarding operation - let document_id = context - .store - .get_document_id_by_operation_id(&operation_id) - .await - .unwrap_or_else(|_| { - panic!( - "Failed database query when retrieving document id by operation_id {}", - operation_id - ) - }); - - match document_id { - Some(document_id) => { - // Dispatch "reduce" task which will materialize the regarding document. - factory.queue(Task::new("reduce", TaskInput::DocumentId(document_id))) - } - None => { - // Panic when we couldn't find the regarding document in the database. We can - // safely assure that this is due to a critical bug affecting the database - // integrity. Panicking here will close `handle` and by that signal a node - // shutdown. - panic!("Could not find document for operation_id {}", operation_id); - } - }; + let handle = { + let mut shutdown_handler = shutdown_handler.clone(); + + task::spawn(async move { + loop { + tokio::select! { + Ok(ServiceMessage::NewOperation(operation_id)) = rx.recv() => { + // Resolve document id of regarding operation + let document_id = context + .store + .get_document_id_by_operation_id(&operation_id) + .await + .unwrap_or_else(|_| { + panic!( + "Failed database query when retrieving document id by operation_id {}", + operation_id + ) + }); + + match document_id { + Some(document_id) => { + // Dispatch "reduce" task which will materialize the regarding document. + factory.queue(Task::new("reduce", TaskInput::DocumentId(document_id))) + } + None => { + // Panic when we couldn't find the regarding document in the database. We can + // safely assure that this is due to a critical bug affecting the database + // integrity. Panicking here will close `handle` and by that signal a node + // shutdown. + panic!("Could not find document for operation_id {}", operation_id); + } + }; + }, + _ = shutdown_request_received.next() => { + // Stop handling incoming new operations and jump to next loop instead + break; + }, + } } - } - }); + + // Graceful shutdown: wait until the last worker has finished + loop { + if factory.is_all_empty() { + break; + } + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + + shutdown_handler.set_done(); + }) + }; debug!("Materialiser service is ready"); if tx_ready.send(()).is_err() { @@ -132,7 +158,7 @@ pub async fn materializer_service( tokio::select! { _ = handle => (), _ = status_handle => (), - _ = shutdown => (), + _ = shutdown => shutdown_handler.is_done().await, _ = on_error => (), }