From 17cfd7845589119922385025fa36ba61fa588b07 Mon Sep 17 00:00:00 2001 From: popcnt <142196625+popcnt1@users.noreply.github.com> Date: Sun, 12 Jan 2025 22:01:49 +0800 Subject: [PATCH] feat(rooch-da): enhance execution and rollback logic in DA (#3177) Refactored TxDAIndexer to include RoochStore for advanced transaction sequencing. Improved the rollback process with enhanced handling of startup and sequencer information. Updated execution modes for better clarity and refined RocksDB cache size parsing. ``` --- Cargo.lock | 1 + crates/rooch-common/Cargo.toml | 1 + crates/rooch-common/src/utils/humanize.rs | 81 ++++++++- crates/rooch/src/commands/da/commands/exec.rs | 171 ++++++++++++------ crates/rooch/src/commands/da/commands/mod.rs | 21 ++- .../src/commands/db/commands/dump_tx_root.rs | 1 + .../commands/get_execution_info_by_order.rs | 1 + 7 files changed, 207 insertions(+), 70 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fe19fefbd6..c35110d736 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10234,6 +10234,7 @@ dependencies = [ name = "rooch-common" version = "0.8.6" dependencies = [ + "anyhow 1.0.95", "libc", ] diff --git a/crates/rooch-common/Cargo.toml b/crates/rooch-common/Cargo.toml index 55bf3f4208..cee2e99c4c 100644 --- a/crates/rooch-common/Cargo.toml +++ b/crates/rooch-common/Cargo.toml @@ -13,3 +13,4 @@ rust-version = { workspace = true } [dependencies] libc = { workspace = true } +anyhow = { workspace = true } diff --git a/crates/rooch-common/src/utils/humanize.rs b/crates/rooch-common/src/utils/humanize.rs index fc5be74af4..d1154cef3f 100644 --- a/crates/rooch-common/src/utils/humanize.rs +++ b/crates/rooch-common/src/utils/humanize.rs @@ -1,6 +1,8 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 +use anyhow::bail; + const LOWER_BYTES_UNITS: [&str; 7] = ["b", "k", "m", "g", "t", "p", "e"]; pub fn human_readable_bytes(bytes: u64) -> String { @@ -15,6 +17,51 @@ pub fn human_readable_bytes(bytes: u64) -> String { format!("{:.2}{}", v, LOWER_BYTES_UNITS[unit_index]) } +/// Parses a string like "1k", "10M", etc., into the equivalent value in bytes (u64). +pub fn parse_bytes(input: &str) -> anyhow::Result { + if input.is_empty() { + bail!("Input is empty"); + } + + let chars = input.chars(); + let mut value_str = String::new(); + let mut unit = None; + + for c in chars { + if c.is_ascii_digit() || c == '.' { + value_str.push(c); + } else { + unit = Some(c.to_lowercase().to_string()); + break; + } + } + + // Parse the numeric value + let value: f64 = value_str + .parse() + .map_err(|_| anyhow::anyhow!("Invalid number format"))?; + + // Match the unit + let multiplier = if let Some(unit) = unit { + match LOWER_BYTES_UNITS.iter().position(|&u| u == unit.as_str()) { + Some(index) => 1024u64.saturating_pow(index as u32), + None => bail!("Unrecognized unit: {}", unit), + } + } else { + 1u64 // Default to bytes + }; + + // Compute the total value in bytes + let result = value * multiplier as f64; + + // Ensure it's within the range of u64 + if result > u64::MAX as f64 { + bail!("Value overflowed u64"); + } + + Ok(result as u64) +} + #[cfg(test)] mod tests { use super::*; @@ -23,16 +70,38 @@ mod tests { fn test_human_readable_bytes() { let test_cases = [ (0, "0.00b"), - (1024, "1.00k"), - (1024 * 1024, "1.00m"), - (1024 * 1024 * 1024, "1.00g"), - (1024_u64 * 1024 * 1024 * 1024, "1.00t"), - (1024_u64 * 1024 * 1024 * 1024 * 1024, "1.00p"), - (1024_u64 * 1024 * 1024 * 1024 * 1024 * 1024, "1.00e"), + (1 << 10, "1.00k"), + (1 << 20, "1.00m"), + (1 << 30, "1.00g"), + (1 << 40, "1.00t"), + (1 << 50, "1.00p"), + (1 << 60, "1.00e"), ]; for (bytes, expected) in test_cases.iter() { assert_eq!(human_readable_bytes(*bytes), *expected); } } + + #[test] + fn test_parse_bytes() { + let test_cases = [ + ("0", 0), + ("1", 1), + ("1.5", 1), + ("1.5K", (1 << 10) * 3 / 2), + ("1.4K", 1433), + ("1.5M", (1 << 20) * 3 / 2), + ("1.5G", (1 << 30) * 3 / 2), + ("1.5T", (1 << 40) * 3 / 2), + ("1.5P", (1 << 50) * 3 / 2), + ("1.5E", (1 << 60) * 3 / 2), + ("1.5k", 1536), + ("1.512k", 1548), + ]; + + for (input, expected) in test_cases.iter() { + assert_eq!(parse_bytes(input).unwrap(), *expected); + } + } } diff --git a/crates/rooch/src/commands/da/commands/exec.rs b/crates/rooch/src/commands/da/commands/exec.rs index a14c8925fe..59f894e5c6 100644 --- a/crates/rooch/src/commands/da/commands/exec.rs +++ b/crates/rooch/src/commands/da/commands/exec.rs @@ -21,6 +21,7 @@ use moveos_types::startup_info; use moveos_types::transaction::{TransactionExecutionInfo, VerifiedMoveOSTransaction}; use raw_store::rocks::batch::WriteBatch; use raw_store::traits::DBStore; +use rooch_common::humanize::parse_bytes; use rooch_config::R_OPT_NET_HELP; use rooch_db::RoochDB; use rooch_event::actor::EventActor; @@ -28,11 +29,16 @@ use rooch_executor::actor::executor::ExecutorActor; use rooch_executor::actor::reader_executor::ReaderExecutorActor; use rooch_executor::proxy::ExecutorProxy; use rooch_pipeline_processor::actor::processor::is_vm_panic_error; +use rooch_store::meta_store::SEQUENCER_INFO_KEY; +use rooch_store::META_SEQUENCER_INFO_COLUMN_FAMILY_NAME; use rooch_types::bitcoin::types::Block as BitcoinBlock; use rooch_types::error::RoochResult; use rooch_types::rooch_network::RoochChainID; -use rooch_types::transaction::{L1BlockWithBody, LedgerTransaction, LedgerTxData}; -use std::cmp::min; +use rooch_types::sequencer::SequencerInfo; +use rooch_types::transaction::{ + L1BlockWithBody, LedgerTransaction, LedgerTxData, TransactionSequenceInfo, +}; +use std::cmp::{max, min}; use std::collections::HashMap; use std::fs::File; use std::io::{BufRead, BufReader, Read}; @@ -52,7 +58,8 @@ use tracing::info; pub struct ExecCommand { #[clap( long = "mode", - help = "execute mode: exec, seq, full. exec: only execute transactions, no update sequence related data; full: execute transactions and update sequence related data" + default_value = "all", + help = "Execution mode: exec, seq, all. Default is all" )] pub mode: ExecMode, #[clap(long = "segment-dir")] @@ -90,14 +97,18 @@ pub struct ExecCommand { #[clap(long = "btc-local-block-store-dir")] pub btc_local_block_store_dir: Option, - #[clap(name = "rocksdb-row-cache-size", long, help = "rocksdb row cache size")] - pub row_cache_size: Option, + #[clap( + name = "rocksdb-row-cache-size", + long, + help = "rocksdb row cache size, default 128M" + )] + pub row_cache_size: Option, #[clap( name = "rocksdb-block-cache-size", long, - help = "rocksdb block cache size" + help = "rocksdb block cache size, default 4G" )] - pub block_cache_size: Option, + pub block_cache_size: Option, #[clap(long = "enable-rocks-stats", help = "rocksdb-enable-statistics")] pub enable_rocks_stats: bool, } @@ -106,7 +117,7 @@ pub struct ExecCommand { pub enum ExecMode { Exec, // Only execute transactions, no sequence updates Seq, // Only update sequence data, no execution - Both, // Execute transactions and update sequence data + All, // Execute transactions and update sequence data } impl ExecMode { @@ -114,19 +125,19 @@ impl ExecMode { match self { ExecMode::Exec => 0b10, // Execute ExecMode::Seq => 0b01, // Sequence - ExecMode::Both => 0b11, // Both + ExecMode::All => 0b11, // All } } - pub fn is_exec(&self) -> bool { + pub fn need_exec(&self) -> bool { self.as_bits() & 0b10 != 0 } - pub fn is_seq(&self) -> bool { + pub fn need_seq(&self) -> bool { self.as_bits() & 0b01 != 0 } - pub fn is_both(&self) -> bool { + pub fn need_all(&self) -> bool { self.as_bits() == 0b11 } @@ -134,7 +145,7 @@ impl ExecMode { match self { ExecMode::Exec => "state root", ExecMode::Seq => "accumulator root", - ExecMode::Both => "state+accumulator root", + ExecMode::All => "state+accumulator root", } .to_string() } @@ -150,13 +161,22 @@ impl ExecCommand { async fn build_exec_inner(&self) -> anyhow::Result { let actor_system = ActorSystem::global_system(); + let row_cache_size = self + .row_cache_size + .clone() + .and_then(|v| parse_bytes(&v).ok()); + let block_cache_size = self + .block_cache_size + .clone() + .and_then(|v| parse_bytes(&v).ok()); + let (executor, moveos_store, rooch_db) = build_executor_and_store( self.base_data_dir.clone(), self.chain_id.clone(), &actor_system, self.enable_rocks_stats, - self.row_cache_size, - self.block_cache_size, + row_cache_size, + block_cache_size, ) .await?; @@ -176,6 +196,7 @@ impl ExecCommand { let tx_da_indexer = TxDAIndexer::load_from_file( self.order_hash_path.clone(), moveos_store.transaction_store, + rooch_db.rooch_store.clone(), )?; Ok(ExecInner { mode: self.mode, @@ -318,67 +339,83 @@ impl ExecInner { fn update_startup_info_after_rollback( &self, - execution_info: TransactionExecutionInfo, + execution_info: Option, + sequencer_info: Option, ) -> anyhow::Result<()> { - let rollback_startup_info = - startup_info::StartupInfo::new(execution_info.state_root, execution_info.size); + let rollback_sequencer_info = if let Some(sequencer_info) = sequencer_info { + Some(SequencerInfo::new( + sequencer_info.tx_order, + sequencer_info.tx_accumulator_info(), + )) + } else { + None + }; + let rollback_startup_info = if let Some(execution_info) = execution_info { + Some(startup_info::StartupInfo::new( + execution_info.state_root, + execution_info.size, + )) + } else { + None + }; let inner_store = &self.rooch_db.rooch_store.store_instance; let mut write_batch = WriteBatch::new(); - let cf_names = vec![CONFIG_STARTUP_INFO_COLUMN_FAMILY_NAME]; - - write_batch.put( - to_bytes(STARTUP_INFO_KEY).unwrap(), - to_bytes(&rollback_startup_info).unwrap(), - )?; + let mut cf_names = Vec::new(); + if let Some(rollback_sequencer_info) = rollback_sequencer_info { + cf_names.push(META_SEQUENCER_INFO_COLUMN_FAMILY_NAME); + write_batch.put( + to_bytes(SEQUENCER_INFO_KEY).unwrap(), + to_bytes(&rollback_sequencer_info).unwrap(), + )?; + } + if let Some(rollback_startup_info) = rollback_startup_info { + cf_names.push(CONFIG_STARTUP_INFO_COLUMN_FAMILY_NAME); + write_batch.put( + to_bytes(STARTUP_INFO_KEY).unwrap(), + to_bytes(&rollback_startup_info).unwrap(), + )?; + } inner_store.write_batch_across_cfs(cf_names, write_batch, true) } async fn produce_tx(&self, tx: Sender) -> anyhow::Result<()> { let last_executed_opt = self.tx_da_indexer.find_last_executed()?; + let last_executed_tx_order = match last_executed_opt.clone() { + Some(v) => v.tx_order, + None => 0, + }; + let mut next_tx_order = last_executed_tx_order + 1; + let last_sequenced_tx = self.sequenced_tx_store.get_last_tx_order(); - let mut next_tx_order = last_executed_opt - .clone() - .map(|v| v.tx_order + 1) - .unwrap_or(1); + let next_sequence_tx = last_sequenced_tx + 1; - if self.mode.is_both() && next_tx_order != last_sequenced_tx + 1 { - let last_executed_tx_order = match last_executed_opt { - Some(v) => v.tx_order, - None => 0, - }; + let last_full_executed_tx_order = min(last_sequenced_tx, last_executed_tx_order); + let last_partial_executed_tx_order = max(last_sequenced_tx, last_executed_tx_order); + + let mut rollback_to = self.rollback; + if self.mode.need_all() && next_tx_order != next_sequence_tx { info! { "Last executed tx order: {}, last sequenced tx order: {}, need rollback to tx order: {}", last_executed_tx_order, last_sequenced_tx, - min(last_sequenced_tx, last_executed_tx_order) + last_full_executed_tx_order }; - return Ok(()); - } - - let mut next_block_number = last_executed_opt - .clone() - .map(|v| v.block_number) // next_tx_order and last executed tx may be in the same block - .unwrap_or(0); - - if !self.mode.is_exec() { - next_tx_order = last_sequenced_tx + 1; - next_block_number = self.tx_da_indexer.find_tx_block(next_tx_order).unwrap(); + if rollback_to.is_none() { + rollback_to = Some(last_full_executed_tx_order); + } else { + rollback_to = Some(min(rollback_to.unwrap(), last_full_executed_tx_order)); + } } - info!( - "next_tx_order: {:?}. need rollback soon: {:?}", - next_tx_order, - self.rollback.is_some() - ); - - // If rollback not set or ge executed_tx_order, start from executed_tx_order+1(nothing to do); otherwise, rollback to this order - if let (Some(rollback), Some(last_executed)) = (self.rollback, last_executed_opt.clone()) { - let last_executed_tx_order = last_executed.tx_order; - if rollback < last_executed_tx_order { - let new_last_and_rollback = - self.tx_da_indexer.slice(rollback, last_executed_tx_order)?; + // If rollback not set or ge `last_partial_executed_tx_order`: nothing to do; + // otherwise, rollback to this order + if let Some(rollback) = rollback_to { + if rollback < last_partial_executed_tx_order { + let new_last_and_rollback = self + .tx_da_indexer + .slice(rollback, last_partial_executed_tx_order)?; // split into two parts, the first get execution info for new startup, all others rollback let (new_last, rollback_part) = new_last_and_rollback.split_first().unwrap(); info!( @@ -399,12 +436,26 @@ impl ExecInner { } let rollback_execution_info = self.tx_da_indexer.get_execution_info(new_last.tx_hash)?; - self.update_startup_info_after_rollback(rollback_execution_info.unwrap())?; + let rollback_sequencer_info = + self.tx_da_indexer.get_sequencer_info(new_last.tx_hash)?; + self.update_startup_info_after_rollback( + rollback_execution_info, + rollback_sequencer_info, + )?; info!("Rollback transactions done. Please RESTART process without rollback."); return Ok(()); // rollback done, need to restart to get new state_root for startup rooch store } }; + let mut next_block_number = last_executed_opt + .clone() + .map(|v| v.block_number) // next_tx_order and last executed tx may be in the same block + .unwrap_or(0); + + if !self.mode.need_exec() { + next_tx_order = last_sequenced_tx + 1; + next_block_number = self.tx_da_indexer.find_tx_block(next_tx_order).unwrap(); + } info!( "Start to produce transactions from tx_order: {}, check from block: {}", next_tx_order, next_block_number, @@ -523,12 +574,12 @@ impl ExecInner { let exp_state_root = exp_root_opt.map(|v| v.0); let exp_accumulator_root = exp_root_opt.map(|v| v.1); - if self.mode.is_seq() { + if self.mode.need_seq() { self.sequenced_tx_store .store_tx(ledger_tx.clone(), exp_accumulator_root)?; } - if self.mode.is_exec() { + if self.mode.need_exec() { let moveos_tx = self .validate_ledger_transaction(ledger_tx, l1_block_with_body) .await?; diff --git a/crates/rooch/src/commands/da/commands/mod.rs b/crates/rooch/src/commands/da/commands/mod.rs index a96340d2c1..8e3017dc82 100644 --- a/crates/rooch/src/commands/da/commands/mod.rs +++ b/crates/rooch/src/commands/da/commands/mod.rs @@ -15,7 +15,7 @@ use rooch_types::da::chunk::chunk_from_segments; use rooch_types::da::segment::{segment_from_bytes, SegmentID}; use rooch_types::rooch_network::RoochChainID; use rooch_types::sequencer::SequencerInfo; -use rooch_types::transaction::LedgerTransaction; +use rooch_types::transaction::{LedgerTransaction, TransactionSequenceInfo}; use std::collections::HashMap; use std::fs; use std::fs::File; @@ -299,12 +299,14 @@ impl std::str::FromStr for TxDAIndex { pub struct TxDAIndexer { tx_order_hash_blocks: Vec, transaction_store: TransactionDBStore, + rooch_store: RoochStore, } impl TxDAIndexer { pub fn load_from_file( file_path: PathBuf, transaction_store: TransactionDBStore, + rooch_store: RoochStore, ) -> anyhow::Result { let mut tx_order_hashes = Vec::with_capacity(70000000); let mut reader = BufReader::new(File::open(file_path)?); @@ -314,13 +316,14 @@ impl TxDAIndexer { tx_order_hashes.push(item); } tx_order_hashes.sort_by(|a, b| a.tx_order.cmp(&b.tx_order)); // avoiding wrong order - tracing::info!( + info!( "tx_order:tx_hash:block indexer loaded, tx cnt: {}", tx_order_hashes.len() ); Ok(TxDAIndexer { tx_order_hash_blocks: tx_order_hashes, transaction_store, + rooch_store, }) } @@ -383,8 +386,18 @@ impl TxDAIndexer { &self, tx_hash: H256, ) -> anyhow::Result> { - let execution_info = self.transaction_store.get_tx_execution_info(tx_hash)?; - Ok(execution_info) + self.transaction_store.get_tx_execution_info(tx_hash) + } + + pub fn get_sequencer_info( + &self, + tx_hash: H256, + ) -> anyhow::Result> { + Ok(self + .rooch_store + .transaction_store + .get_transaction_by_hash(tx_hash)? + .map(|transaction| transaction.sequence_info)) } pub fn get_execution_info_by_order( diff --git a/crates/rooch/src/commands/db/commands/dump_tx_root.rs b/crates/rooch/src/commands/db/commands/dump_tx_root.rs index 6472e67e30..c7c5794118 100644 --- a/crates/rooch/src/commands/db/commands/dump_tx_root.rs +++ b/crates/rooch/src/commands/db/commands/dump_tx_root.rs @@ -43,6 +43,7 @@ impl DumpTxRootCommand { let tx_da_indexer = TxDAIndexer::load_from_file( self.order_hash_path.clone(), moveos_store.transaction_store, + rooch_db.rooch_store.clone(), )?; let file = File::create(self.output.clone())?; diff --git a/crates/rooch/src/commands/db/commands/get_execution_info_by_order.rs b/crates/rooch/src/commands/db/commands/get_execution_info_by_order.rs index 3ea6ef001b..ccea833bd9 100644 --- a/crates/rooch/src/commands/db/commands/get_execution_info_by_order.rs +++ b/crates/rooch/src/commands/db/commands/get_execution_info_by_order.rs @@ -38,6 +38,7 @@ impl GetExecutionInfoByOrderCommand { let tx_da_indexer = TxDAIndexer::load_from_file( self.order_hash_path.clone(), moveos_store.transaction_store, + rooch_db.rooch_store.clone(), )?; let tx_order = self.order;