Skip to content

Commit

Permalink
Pruning skeleton (#1229)
Browse files Browse the repository at this point in the history
* Implement managed tasks for SubscriptionManager

* Fix clippy

* Add comment about trace task

* Use broadcast instead of mpsc for l2 blocks events

* Fix spawns in DaService

* Add TaskManager

* Use task manager in sequencer

* Document TaskManager

* Handle shutdown event

* Use TaskTracker

* Add comment about using a cancellation token

* Use JoinHandles instead of TaskTracker

* Use TaskManager in fullnode and prover

* Improve bitcoin-da service

* Force spawned tasks to accept a cancellation token

* Use biased polling

* Satisfy clippy

* WIP

* Add pruning tables

* Pruning skeleton implementation

* Use pruner in nodes

* Use biased polling based on order

* WIP

* Fix how config is done

* Derive default

* Add logs

* Let the tasks finish without panicing

* Use pruning config in fullnode and prover

* Add simple run test

* Use option instead of PruningMode

* Unneccessary changes

* l2_receiver

* Cleanup prints

* Use last pruned block in calculation

* Implement pruning criteria

* Lint and add comment

* Set the last_pruned_block to up_to_block value

* Don't store config internally

* Remove from constructor

* Should not change

* Move config value

* Remove pruning from sequencer / prover
  • Loading branch information
rakanalh authored Oct 4, 2024
1 parent 82bf52d commit 7c3440a
Show file tree
Hide file tree
Showing 21 changed files with 301 additions and 24 deletions.
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ members = [
"bin/citrea",
"crates/common",
"crates/primitives",
"crates/pruning",
"crates/bitcoin-da",
"crates/evm",
"crates/ethereum-rpc",
Expand Down
1 change: 1 addition & 0 deletions bin/citrea/tests/bitcoin_e2e/test_case.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ impl<T: TestCase> TestCaseRunner<T> {
),
include_tx_body: true,
accept_public_input_as_proven: Some(true),
pruning_config: None,
sync_blocks_count: 10,
});

Expand Down
1 change: 1 addition & 0 deletions bin/citrea/tests/test_helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ pub fn create_default_rollup_config(
sequencer_client_url: format!("http://localhost:{}", socket_addr.port()),
accept_public_input_as_proven: Some(true),
sync_blocks_count: 10,
pruning_config: None,
}),
NodeMode::SequencerNode => None,
},
Expand Down
20 changes: 0 additions & 20 deletions crates/bitcoin-da/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1132,8 +1132,6 @@ mod tests {
.await
.expect("Failed to send transaction");

println!("sent 1");

da_service
.send_transaction(DaData::SequencerCommitment(SequencerCommitment {
merkle_root: [14; 32],
Expand All @@ -1143,14 +1141,10 @@ mod tests {
.await
.expect("Failed to send transaction");

println!("sent 2");

println!("\n\nSend some BTC to this address: bcrt1qscttjdc3wypf7ttu0203sqgfz80a4q38cne693 and press enter\n\n");
let mut s = String::new();
std::io::stdin().read_line(&mut s).unwrap();

println!("sent 3");

let size = 2000;
let blob = (0..size).map(|_| rand::random::<u8>()).collect::<Vec<u8>>();

Expand All @@ -1159,14 +1153,10 @@ mod tests {
.await
.expect("Failed to send transaction");

println!("sent 4");

println!("\n\nSend some BTC to this address: bcrt1qscttjdc3wypf7ttu0203sqgfz80a4q38cne693 and press enter\n\n");
let mut s = String::new();
std::io::stdin().read_line(&mut s).unwrap();

println!("sent 5");

let size = 600 * 1024;
let blob = (0..size).map(|_| rand::random::<u8>()).collect::<Vec<u8>>();

Expand All @@ -1175,8 +1165,6 @@ mod tests {
.await
.expect("Failed to send transaction");

println!("sent 6");

// seq com different namespace
get_service_wrong_namespace()
.await
Expand All @@ -1196,8 +1184,6 @@ mod tests {
.await
.expect("Failed to send transaction");

println!("sent 7");

// seq com incorrect pubkey and sig
get_service_correct_sig_different_public_key()
.await
Expand All @@ -1218,8 +1204,6 @@ mod tests {
.await
.expect("Failed to send transaction");

println!("sent 8");

let size = 1200 * 1024;
let blob = (0..size).map(|_| rand::random::<u8>()).collect::<Vec<u8>>();

Expand All @@ -1228,8 +1212,6 @@ mod tests {
.await
.expect("Failed to send transaction");

println!("sent 9");

da_service
.send_transaction(DaData::SequencerCommitment(SequencerCommitment {
merkle_root: [30; 32],
Expand All @@ -1238,8 +1220,6 @@ mod tests {
}))
.await
.expect("Failed to send transaction");

println!("sent 10");
}

// #[tokio::test]
Expand Down
1 change: 1 addition & 0 deletions crates/fullnode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ repository.workspace = true
# Citrea Deps
citrea-common = { path = "../common" }
citrea-primitives = { path = "../primitives", features = ["native"] }
citrea-pruning = { path = "../pruning" }
sequencer-client = { path = "../sequencer-client" }

# Sov SDK deps
Expand Down
15 changes: 15 additions & 0 deletions crates/fullnode/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use citrea_common::cache::L1BlockCache;
use citrea_common::da::get_da_block_at_height;
use citrea_common::tasks::manager::TaskManager;
use citrea_primitives::types::SoftConfirmationHash;
use citrea_pruning::{Pruner, PruningConfig};
use jsonrpsee::core::client::Error as JsonrpseeError;
use jsonrpsee::server::{BatchRequestConfig, RpcServiceBuilder, ServerBuilder};
use jsonrpsee::RpcModule;
Expand Down Expand Up @@ -66,6 +67,7 @@ where
sync_blocks_count: u64,
fork_manager: ForkManager,
soft_confirmation_tx: broadcast::Sender<u64>,
pruning_config: Option<PruningConfig>,
task_manager: TaskManager<()>,
}

Expand Down Expand Up @@ -151,6 +153,7 @@ where
l1_block_cache: Arc::new(Mutex::new(L1BlockCache::new())),
fork_manager,
soft_confirmation_tx,
pruning_config: runner_config.pruning_config,
task_manager: TaskManager::default(),
})
}
Expand Down Expand Up @@ -320,6 +323,18 @@ where
}
};

if let Some(config) = &self.pruning_config {
let pruner = Pruner::<DB>::new(
config.clone(),
self.ledger_db.get_last_pruned_l2_height()?.unwrap_or(0),
self.soft_confirmation_tx.subscribe(),
self.ledger_db.clone(),
);

self.task_manager
.spawn(|cancellation_token| pruner.run(cancellation_token));
}

let ledger_db = self.ledger_db.clone();
let da_service = self.da_service.clone();
let sequencer_pub_key = self.sequencer_pub_key.clone();
Expand Down
1 change: 1 addition & 0 deletions crates/fullnode/tests/runner_initialization_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ fn initialize_runner(
include_tx_body: true,
accept_public_input_as_proven: None,
sync_blocks_count: 10,
pruning_config: None,
}),
da: MockDaConfig {
sender_address: address,
Expand Down
29 changes: 29 additions & 0 deletions crates/pruning/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[package]
name = "citrea-pruning"
version.workspace = true
authors.workspace = true
edition.workspace = true
homepage.workspace = true
license.workspace = true
publish.workspace = true
repository.workspace = true

[dependencies]
# Citrea dependencies
citrea-evm = { path = "../evm", features = ["native"] }
citrea-primitives = { path = "../primitives", features = ["native"] }

# Sov SDK deps
sov-db = { path = "../sovereign-sdk/full-node/db/sov-db" }
sov-modules-api = { path = "../sovereign-sdk/module-system/sov-modules-api", default-features = false }

# 3rd-party dependencies
anyhow = { workspace = true }
futures = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
tempfile = { workspace = true }
25 changes: 25 additions & 0 deletions crates/pruning/src/criteria.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/// This defines the interface of a pruning criteria.
pub(crate) trait Criteria {
/// Decides whether pruning should be done or not.
///
/// If None is returned, no pruning should happen.
/// Otherwise, it will return the `up_to_block` value.
fn should_prune(&self, last_pruned_block: u64, current_block_number: u64) -> Option<u64>;
}

/// This distance criteria prunes blocks up to `last_pruned_block + distance`.
/// However, to keep `distance` amount of blocks, we have to wait for at least twice
/// the `distance` value to prune up to that point.
pub(crate) struct DistanceCriteria {
pub(crate) distance: u64,
}

impl Criteria for DistanceCriteria {
fn should_prune(&self, last_pruned_block: u64, current_block_number: u64) -> Option<u64> {
let trigger_block = last_pruned_block + (2 * self.distance) + 1;
if current_block_number >= trigger_block {
return Some(last_pruned_block + self.distance);
}
None
}
}
100 changes: 100 additions & 0 deletions crates/pruning/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use criteria::DistanceCriteria;
use futures::future;
use serde::{Deserialize, Serialize};
use sov_db::ledger_db::SharedLedgerOps;
use tokio::select;
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
use tracing::{error, info};

use crate::criteria::Criteria;
use crate::pruners::{prune_evm, prune_ledger};

mod criteria;
mod pruners;
#[cfg(test)]
mod tests;

/// A configuration type to define the behaviour of the pruner.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct PruningConfig {
/// Defines the number of blocks from the tip of the chain to remove.
pub distance: u64,
}

impl Default for PruningConfig {
fn default() -> Self {
Self { distance: 256 }
}
}

pub struct Pruner<DB>
where
DB: SharedLedgerOps,
{
/// The last block number which was pruned.
last_pruned_block: u64,
/// A channel receiver which gets notified of new L2 blocks.
l2_receiver: broadcast::Receiver<u64>,
/// Access to ledger tables.
ledger_db: DB,
/// Criteria to decide pruning
criteria: Box<dyn Criteria + Send + Sync>,
}

impl<DB> Pruner<DB>
where
DB: SharedLedgerOps + Send + Sync + Clone + 'static,
{
pub fn new(
config: PruningConfig,
last_pruned_block: u64,
l2_receiver: broadcast::Receiver<u64>,
ledger_db: DB,
) -> Self {
// distance is the only criteria implemented at the moment.
let criteria = Box::new(DistanceCriteria {
distance: config.distance,
});
Self {
last_pruned_block,
l2_receiver,
ledger_db,
criteria,
}
}

/// Prune everything
pub async fn prune(&self, up_to_block: u64) {
info!("Pruning up to L2 block: {}", up_to_block);
let ledger_db = self.ledger_db.clone();
let ledger_pruning_handle =
tokio::task::spawn_blocking(move || prune_ledger(ledger_db, up_to_block));
let evm_pruning_handle = tokio::task::spawn_blocking(move || prune_evm(up_to_block));

future::join_all([ledger_pruning_handle, evm_pruning_handle]).await;
}

pub async fn run(mut self, cancellation_token: CancellationToken) {
loop {
select! {
biased;
_ = cancellation_token.cancelled() => {
// Store the last pruned l2 height in ledger DB to be restored in the next initialization.
if let Err(e) = self.ledger_db.set_last_pruned_l2_height(self.last_pruned_block) {
error!("Failed to store last pruned L2 height {}: {:?}", self.last_pruned_block, e);
}
return;
}
current_l2_block = self.l2_receiver.recv() => {
if let Ok(current_l2_block) = current_l2_block {
if let Some(up_to_block) = self.criteria.should_prune(self.last_pruned_block, current_l2_block) {
self.prune(up_to_block).await;
self.last_pruned_block = up_to_block;
}
}
},
}
}
}
}
10 changes: 10 additions & 0 deletions crates/pruning/src/pruners/evm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use citrea_evm::Evm;
use sov_modules_api::default_context::DefaultContext;
use tracing::debug;

/// Prune evm
pub(crate) fn prune_evm(up_to_block: u64) {
debug!("Pruning EVM, up to L2 block {}", up_to_block);
let _evm = Evm::<DefaultContext>::default();
// unimplemented!()
}
8 changes: 8 additions & 0 deletions crates/pruning/src/pruners/ledger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use sov_db::ledger_db::SharedLedgerOps;
use tracing::debug;

/// Prune ledger
pub(crate) fn prune_ledger<DB: SharedLedgerOps>(_ledger_db: DB, up_to_block: u64) {
debug!("Pruning Ledger, up to L2 block {}", up_to_block);
// unimplemented!()
}
5 changes: 5 additions & 0 deletions crates/pruning/src/pruners/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod evm;
mod ledger;

pub(crate) use evm::*;
pub(crate) use ledger::*;
Loading

0 comments on commit 7c3440a

Please sign in to comment.