Skip to content

Commit

Permalink
Gracefully wait until all tasks are done when shutting down materiali…
Browse files Browse the repository at this point in the history
…zer service
  • Loading branch information
adzialocha committed Jun 16, 2024
1 parent 4a4747b commit cc0db3d
Showing 1 changed file with 57 additions and 31 deletions.
88 changes: 57 additions & 31 deletions aquadoggo/src/materializer/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use anyhow::Result;
use log::{debug, warn};
use p2panda_rs::storage_provider::traits::OperationStore;
use tokio::task;
use tokio_stream::StreamExt;

use crate::bus::{ServiceMessage, ServiceSender};
use crate::context::Context;
Expand All @@ -13,6 +14,7 @@ use crate::materializer::tasks::{
};
use crate::materializer::worker::{Factory, Task, TaskStatus};
use crate::materializer::TaskInput;
use crate::shutdown::ShutdownHandler;

/// Capacity of the internal broadcast channels used inside the worker factory.
///
Expand Down Expand Up @@ -87,41 +89,65 @@ pub async fn materializer_service(
factory.queue(task.to_owned());
});

let mut shutdown_handler = ShutdownHandler::new();
let mut shutdown_request_received = shutdown_handler.is_requested();

// Subscribe to communication bus
let mut rx = tx.subscribe();

// Listen to incoming new entries and operations and move them into task queue
let handle = task::spawn(async move {
loop {
if let Ok(ServiceMessage::NewOperation(operation_id)) = rx.recv().await {
// Resolve document id of regarding operation
let document_id = context
.store
.get_document_id_by_operation_id(&operation_id)
.await
.unwrap_or_else(|_| {
panic!(
"Failed database query when retrieving document id by operation_id {}",
operation_id
)
});

match document_id {
Some(document_id) => {
// Dispatch "reduce" task which will materialize the regarding document.
factory.queue(Task::new("reduce", TaskInput::DocumentId(document_id)))
}
None => {
// Panic when we couldn't find the regarding document in the database. We can
// safely assure that this is due to a critical bug affecting the database
// integrity. Panicking here will close `handle` and by that signal a node
// shutdown.
panic!("Could not find document for operation_id {}", operation_id);
}
};
let handle = {
let mut shutdown_handler = shutdown_handler.clone();

task::spawn(async move {
loop {
tokio::select! {
Ok(ServiceMessage::NewOperation(operation_id)) = rx.recv() => {
// Resolve document id of regarding operation
let document_id = context
.store
.get_document_id_by_operation_id(&operation_id)
.await
.unwrap_or_else(|_| {
panic!(
"Failed database query when retrieving document id by operation_id {}",
operation_id
)
});

match document_id {
Some(document_id) => {
// Dispatch "reduce" task which will materialize the regarding document.
factory.queue(Task::new("reduce", TaskInput::DocumentId(document_id)))
}
None => {
// Panic when we couldn't find the regarding document in the database. We can
// safely assure that this is due to a critical bug affecting the database
// integrity. Panicking here will close `handle` and by that signal a node
// shutdown.
panic!("Could not find document for operation_id {}", operation_id);
}
};
},
_ = shutdown_request_received.next() => {
// Stop handling incoming new operations and jump to next loop instead
break;
},
}
}
}
});

// Graceful shutdown: wait until the last worker has finished
loop {
if factory.is_all_empty() {
break;
}

tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}

shutdown_handler.set_done();
})
};

debug!("Materialiser service is ready");
if tx_ready.send(()).is_err() {
Expand All @@ -132,7 +158,7 @@ pub async fn materializer_service(
tokio::select! {
_ = handle => (),
_ = status_handle => (),
_ = shutdown => (),
_ = shutdown => shutdown_handler.is_done().await,
_ = on_error => (),
}

Expand Down

0 comments on commit cc0db3d

Please sign in to comment.