diff --git a/Cargo.lock b/Cargo.lock index fe19fefbd6..c35110d736 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10234,6 +10234,7 @@ dependencies = [ name = "rooch-common" version = "0.8.6" dependencies = [ + "anyhow 1.0.95", "libc", ] diff --git a/crates/rooch-common/Cargo.toml b/crates/rooch-common/Cargo.toml index 55bf3f4208..cee2e99c4c 100644 --- a/crates/rooch-common/Cargo.toml +++ b/crates/rooch-common/Cargo.toml @@ -13,3 +13,4 @@ rust-version = { workspace = true } [dependencies] libc = { workspace = true } +anyhow = { workspace = true } diff --git a/crates/rooch-common/src/utils/humanize.rs b/crates/rooch-common/src/utils/humanize.rs index fc5be74af4..d1154cef3f 100644 --- a/crates/rooch-common/src/utils/humanize.rs +++ b/crates/rooch-common/src/utils/humanize.rs @@ -1,6 +1,8 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 +use anyhow::bail; + const LOWER_BYTES_UNITS: [&str; 7] = ["b", "k", "m", "g", "t", "p", "e"]; pub fn human_readable_bytes(bytes: u64) -> String { @@ -15,6 +17,51 @@ pub fn human_readable_bytes(bytes: u64) -> String { format!("{:.2}{}", v, LOWER_BYTES_UNITS[unit_index]) } +/// Parses a string like "1k", "10M", etc., into the equivalent value in bytes (u64). +pub fn parse_bytes(input: &str) -> anyhow::Result { + if input.is_empty() { + bail!("Input is empty"); + } + + let chars = input.chars(); + let mut value_str = String::new(); + let mut unit = None; + + for c in chars { + if c.is_ascii_digit() || c == '.' { + value_str.push(c); + } else { + unit = Some(c.to_lowercase().to_string()); + break; + } + } + + // Parse the numeric value + let value: f64 = value_str + .parse() + .map_err(|_| anyhow::anyhow!("Invalid number format"))?; + + // Match the unit + let multiplier = if let Some(unit) = unit { + match LOWER_BYTES_UNITS.iter().position(|&u| u == unit.as_str()) { + Some(index) => 1024u64.saturating_pow(index as u32), + None => bail!("Unrecognized unit: {}", unit), + } + } else { + 1u64 // Default to bytes + }; + + // Compute the total value in bytes + let result = value * multiplier as f64; + + // Ensure it's within the range of u64 + if result > u64::MAX as f64 { + bail!("Value overflowed u64"); + } + + Ok(result as u64) +} + #[cfg(test)] mod tests { use super::*; @@ -23,16 +70,38 @@ mod tests { fn test_human_readable_bytes() { let test_cases = [ (0, "0.00b"), - (1024, "1.00k"), - (1024 * 1024, "1.00m"), - (1024 * 1024 * 1024, "1.00g"), - (1024_u64 * 1024 * 1024 * 1024, "1.00t"), - (1024_u64 * 1024 * 1024 * 1024 * 1024, "1.00p"), - (1024_u64 * 1024 * 1024 * 1024 * 1024 * 1024, "1.00e"), + (1 << 10, "1.00k"), + (1 << 20, "1.00m"), + (1 << 30, "1.00g"), + (1 << 40, "1.00t"), + (1 << 50, "1.00p"), + (1 << 60, "1.00e"), ]; for (bytes, expected) in test_cases.iter() { assert_eq!(human_readable_bytes(*bytes), *expected); } } + + #[test] + fn test_parse_bytes() { + let test_cases = [ + ("0", 0), + ("1", 1), + ("1.5", 1), + ("1.5K", (1 << 10) * 3 / 2), + ("1.4K", 1433), + ("1.5M", (1 << 20) * 3 / 2), + ("1.5G", (1 << 30) * 3 / 2), + ("1.5T", (1 << 40) * 3 / 2), + ("1.5P", (1 << 50) * 3 / 2), + ("1.5E", (1 << 60) * 3 / 2), + ("1.5k", 1536), + ("1.512k", 1548), + ]; + + for (input, expected) in test_cases.iter() { + assert_eq!(parse_bytes(input).unwrap(), *expected); + } + } } diff --git a/crates/rooch/src/commands/da/commands/exec.rs b/crates/rooch/src/commands/da/commands/exec.rs index a14c8925fe..59f894e5c6 100644 --- a/crates/rooch/src/commands/da/commands/exec.rs +++ b/crates/rooch/src/commands/da/commands/exec.rs @@ -21,6 +21,7 @@ use moveos_types::startup_info; use moveos_types::transaction::{TransactionExecutionInfo, VerifiedMoveOSTransaction}; use raw_store::rocks::batch::WriteBatch; use raw_store::traits::DBStore; +use rooch_common::humanize::parse_bytes; use rooch_config::R_OPT_NET_HELP; use rooch_db::RoochDB; use rooch_event::actor::EventActor; @@ -28,11 +29,16 @@ use rooch_executor::actor::executor::ExecutorActor; use rooch_executor::actor::reader_executor::ReaderExecutorActor; use rooch_executor::proxy::ExecutorProxy; use rooch_pipeline_processor::actor::processor::is_vm_panic_error; +use rooch_store::meta_store::SEQUENCER_INFO_KEY; +use rooch_store::META_SEQUENCER_INFO_COLUMN_FAMILY_NAME; use rooch_types::bitcoin::types::Block as BitcoinBlock; use rooch_types::error::RoochResult; use rooch_types::rooch_network::RoochChainID; -use rooch_types::transaction::{L1BlockWithBody, LedgerTransaction, LedgerTxData}; -use std::cmp::min; +use rooch_types::sequencer::SequencerInfo; +use rooch_types::transaction::{ + L1BlockWithBody, LedgerTransaction, LedgerTxData, TransactionSequenceInfo, +}; +use std::cmp::{max, min}; use std::collections::HashMap; use std::fs::File; use std::io::{BufRead, BufReader, Read}; @@ -52,7 +58,8 @@ use tracing::info; pub struct ExecCommand { #[clap( long = "mode", - help = "execute mode: exec, seq, full. exec: only execute transactions, no update sequence related data; full: execute transactions and update sequence related data" + default_value = "all", + help = "Execution mode: exec, seq, all. Default is all" )] pub mode: ExecMode, #[clap(long = "segment-dir")] @@ -90,14 +97,18 @@ pub struct ExecCommand { #[clap(long = "btc-local-block-store-dir")] pub btc_local_block_store_dir: Option, - #[clap(name = "rocksdb-row-cache-size", long, help = "rocksdb row cache size")] - pub row_cache_size: Option, + #[clap( + name = "rocksdb-row-cache-size", + long, + help = "rocksdb row cache size, default 128M" + )] + pub row_cache_size: Option, #[clap( name = "rocksdb-block-cache-size", long, - help = "rocksdb block cache size" + help = "rocksdb block cache size, default 4G" )] - pub block_cache_size: Option, + pub block_cache_size: Option, #[clap(long = "enable-rocks-stats", help = "rocksdb-enable-statistics")] pub enable_rocks_stats: bool, } @@ -106,7 +117,7 @@ pub struct ExecCommand { pub enum ExecMode { Exec, // Only execute transactions, no sequence updates Seq, // Only update sequence data, no execution - Both, // Execute transactions and update sequence data + All, // Execute transactions and update sequence data } impl ExecMode { @@ -114,19 +125,19 @@ impl ExecMode { match self { ExecMode::Exec => 0b10, // Execute ExecMode::Seq => 0b01, // Sequence - ExecMode::Both => 0b11, // Both + ExecMode::All => 0b11, // All } } - pub fn is_exec(&self) -> bool { + pub fn need_exec(&self) -> bool { self.as_bits() & 0b10 != 0 } - pub fn is_seq(&self) -> bool { + pub fn need_seq(&self) -> bool { self.as_bits() & 0b01 != 0 } - pub fn is_both(&self) -> bool { + pub fn need_all(&self) -> bool { self.as_bits() == 0b11 } @@ -134,7 +145,7 @@ impl ExecMode { match self { ExecMode::Exec => "state root", ExecMode::Seq => "accumulator root", - ExecMode::Both => "state+accumulator root", + ExecMode::All => "state+accumulator root", } .to_string() } @@ -150,13 +161,22 @@ impl ExecCommand { async fn build_exec_inner(&self) -> anyhow::Result { let actor_system = ActorSystem::global_system(); + let row_cache_size = self + .row_cache_size + .clone() + .and_then(|v| parse_bytes(&v).ok()); + let block_cache_size = self + .block_cache_size + .clone() + .and_then(|v| parse_bytes(&v).ok()); + let (executor, moveos_store, rooch_db) = build_executor_and_store( self.base_data_dir.clone(), self.chain_id.clone(), &actor_system, self.enable_rocks_stats, - self.row_cache_size, - self.block_cache_size, + row_cache_size, + block_cache_size, ) .await?; @@ -176,6 +196,7 @@ impl ExecCommand { let tx_da_indexer = TxDAIndexer::load_from_file( self.order_hash_path.clone(), moveos_store.transaction_store, + rooch_db.rooch_store.clone(), )?; Ok(ExecInner { mode: self.mode, @@ -318,67 +339,83 @@ impl ExecInner { fn update_startup_info_after_rollback( &self, - execution_info: TransactionExecutionInfo, + execution_info: Option, + sequencer_info: Option, ) -> anyhow::Result<()> { - let rollback_startup_info = - startup_info::StartupInfo::new(execution_info.state_root, execution_info.size); + let rollback_sequencer_info = if let Some(sequencer_info) = sequencer_info { + Some(SequencerInfo::new( + sequencer_info.tx_order, + sequencer_info.tx_accumulator_info(), + )) + } else { + None + }; + let rollback_startup_info = if let Some(execution_info) = execution_info { + Some(startup_info::StartupInfo::new( + execution_info.state_root, + execution_info.size, + )) + } else { + None + }; let inner_store = &self.rooch_db.rooch_store.store_instance; let mut write_batch = WriteBatch::new(); - let cf_names = vec![CONFIG_STARTUP_INFO_COLUMN_FAMILY_NAME]; - - write_batch.put( - to_bytes(STARTUP_INFO_KEY).unwrap(), - to_bytes(&rollback_startup_info).unwrap(), - )?; + let mut cf_names = Vec::new(); + if let Some(rollback_sequencer_info) = rollback_sequencer_info { + cf_names.push(META_SEQUENCER_INFO_COLUMN_FAMILY_NAME); + write_batch.put( + to_bytes(SEQUENCER_INFO_KEY).unwrap(), + to_bytes(&rollback_sequencer_info).unwrap(), + )?; + } + if let Some(rollback_startup_info) = rollback_startup_info { + cf_names.push(CONFIG_STARTUP_INFO_COLUMN_FAMILY_NAME); + write_batch.put( + to_bytes(STARTUP_INFO_KEY).unwrap(), + to_bytes(&rollback_startup_info).unwrap(), + )?; + } inner_store.write_batch_across_cfs(cf_names, write_batch, true) } async fn produce_tx(&self, tx: Sender) -> anyhow::Result<()> { let last_executed_opt = self.tx_da_indexer.find_last_executed()?; + let last_executed_tx_order = match last_executed_opt.clone() { + Some(v) => v.tx_order, + None => 0, + }; + let mut next_tx_order = last_executed_tx_order + 1; + let last_sequenced_tx = self.sequenced_tx_store.get_last_tx_order(); - let mut next_tx_order = last_executed_opt - .clone() - .map(|v| v.tx_order + 1) - .unwrap_or(1); + let next_sequence_tx = last_sequenced_tx + 1; - if self.mode.is_both() && next_tx_order != last_sequenced_tx + 1 { - let last_executed_tx_order = match last_executed_opt { - Some(v) => v.tx_order, - None => 0, - }; + let last_full_executed_tx_order = min(last_sequenced_tx, last_executed_tx_order); + let last_partial_executed_tx_order = max(last_sequenced_tx, last_executed_tx_order); + + let mut rollback_to = self.rollback; + if self.mode.need_all() && next_tx_order != next_sequence_tx { info! { "Last executed tx order: {}, last sequenced tx order: {}, need rollback to tx order: {}", last_executed_tx_order, last_sequenced_tx, - min(last_sequenced_tx, last_executed_tx_order) + last_full_executed_tx_order }; - return Ok(()); - } - - let mut next_block_number = last_executed_opt - .clone() - .map(|v| v.block_number) // next_tx_order and last executed tx may be in the same block - .unwrap_or(0); - - if !self.mode.is_exec() { - next_tx_order = last_sequenced_tx + 1; - next_block_number = self.tx_da_indexer.find_tx_block(next_tx_order).unwrap(); + if rollback_to.is_none() { + rollback_to = Some(last_full_executed_tx_order); + } else { + rollback_to = Some(min(rollback_to.unwrap(), last_full_executed_tx_order)); + } } - info!( - "next_tx_order: {:?}. need rollback soon: {:?}", - next_tx_order, - self.rollback.is_some() - ); - - // If rollback not set or ge executed_tx_order, start from executed_tx_order+1(nothing to do); otherwise, rollback to this order - if let (Some(rollback), Some(last_executed)) = (self.rollback, last_executed_opt.clone()) { - let last_executed_tx_order = last_executed.tx_order; - if rollback < last_executed_tx_order { - let new_last_and_rollback = - self.tx_da_indexer.slice(rollback, last_executed_tx_order)?; + // If rollback not set or ge `last_partial_executed_tx_order`: nothing to do; + // otherwise, rollback to this order + if let Some(rollback) = rollback_to { + if rollback < last_partial_executed_tx_order { + let new_last_and_rollback = self + .tx_da_indexer + .slice(rollback, last_partial_executed_tx_order)?; // split into two parts, the first get execution info for new startup, all others rollback let (new_last, rollback_part) = new_last_and_rollback.split_first().unwrap(); info!( @@ -399,12 +436,26 @@ impl ExecInner { } let rollback_execution_info = self.tx_da_indexer.get_execution_info(new_last.tx_hash)?; - self.update_startup_info_after_rollback(rollback_execution_info.unwrap())?; + let rollback_sequencer_info = + self.tx_da_indexer.get_sequencer_info(new_last.tx_hash)?; + self.update_startup_info_after_rollback( + rollback_execution_info, + rollback_sequencer_info, + )?; info!("Rollback transactions done. Please RESTART process without rollback."); return Ok(()); // rollback done, need to restart to get new state_root for startup rooch store } }; + let mut next_block_number = last_executed_opt + .clone() + .map(|v| v.block_number) // next_tx_order and last executed tx may be in the same block + .unwrap_or(0); + + if !self.mode.need_exec() { + next_tx_order = last_sequenced_tx + 1; + next_block_number = self.tx_da_indexer.find_tx_block(next_tx_order).unwrap(); + } info!( "Start to produce transactions from tx_order: {}, check from block: {}", next_tx_order, next_block_number, @@ -523,12 +574,12 @@ impl ExecInner { let exp_state_root = exp_root_opt.map(|v| v.0); let exp_accumulator_root = exp_root_opt.map(|v| v.1); - if self.mode.is_seq() { + if self.mode.need_seq() { self.sequenced_tx_store .store_tx(ledger_tx.clone(), exp_accumulator_root)?; } - if self.mode.is_exec() { + if self.mode.need_exec() { let moveos_tx = self .validate_ledger_transaction(ledger_tx, l1_block_with_body) .await?; diff --git a/crates/rooch/src/commands/da/commands/mod.rs b/crates/rooch/src/commands/da/commands/mod.rs index a96340d2c1..8e3017dc82 100644 --- a/crates/rooch/src/commands/da/commands/mod.rs +++ b/crates/rooch/src/commands/da/commands/mod.rs @@ -15,7 +15,7 @@ use rooch_types::da::chunk::chunk_from_segments; use rooch_types::da::segment::{segment_from_bytes, SegmentID}; use rooch_types::rooch_network::RoochChainID; use rooch_types::sequencer::SequencerInfo; -use rooch_types::transaction::LedgerTransaction; +use rooch_types::transaction::{LedgerTransaction, TransactionSequenceInfo}; use std::collections::HashMap; use std::fs; use std::fs::File; @@ -299,12 +299,14 @@ impl std::str::FromStr for TxDAIndex { pub struct TxDAIndexer { tx_order_hash_blocks: Vec, transaction_store: TransactionDBStore, + rooch_store: RoochStore, } impl TxDAIndexer { pub fn load_from_file( file_path: PathBuf, transaction_store: TransactionDBStore, + rooch_store: RoochStore, ) -> anyhow::Result { let mut tx_order_hashes = Vec::with_capacity(70000000); let mut reader = BufReader::new(File::open(file_path)?); @@ -314,13 +316,14 @@ impl TxDAIndexer { tx_order_hashes.push(item); } tx_order_hashes.sort_by(|a, b| a.tx_order.cmp(&b.tx_order)); // avoiding wrong order - tracing::info!( + info!( "tx_order:tx_hash:block indexer loaded, tx cnt: {}", tx_order_hashes.len() ); Ok(TxDAIndexer { tx_order_hash_blocks: tx_order_hashes, transaction_store, + rooch_store, }) } @@ -383,8 +386,18 @@ impl TxDAIndexer { &self, tx_hash: H256, ) -> anyhow::Result> { - let execution_info = self.transaction_store.get_tx_execution_info(tx_hash)?; - Ok(execution_info) + self.transaction_store.get_tx_execution_info(tx_hash) + } + + pub fn get_sequencer_info( + &self, + tx_hash: H256, + ) -> anyhow::Result> { + Ok(self + .rooch_store + .transaction_store + .get_transaction_by_hash(tx_hash)? + .map(|transaction| transaction.sequence_info)) } pub fn get_execution_info_by_order( diff --git a/crates/rooch/src/commands/db/commands/dump_tx_root.rs b/crates/rooch/src/commands/db/commands/dump_tx_root.rs index 6472e67e30..c7c5794118 100644 --- a/crates/rooch/src/commands/db/commands/dump_tx_root.rs +++ b/crates/rooch/src/commands/db/commands/dump_tx_root.rs @@ -43,6 +43,7 @@ impl DumpTxRootCommand { let tx_da_indexer = TxDAIndexer::load_from_file( self.order_hash_path.clone(), moveos_store.transaction_store, + rooch_db.rooch_store.clone(), )?; let file = File::create(self.output.clone())?; diff --git a/crates/rooch/src/commands/db/commands/get_execution_info_by_order.rs b/crates/rooch/src/commands/db/commands/get_execution_info_by_order.rs index 3ea6ef001b..ccea833bd9 100644 --- a/crates/rooch/src/commands/db/commands/get_execution_info_by_order.rs +++ b/crates/rooch/src/commands/db/commands/get_execution_info_by_order.rs @@ -38,6 +38,7 @@ impl GetExecutionInfoByOrderCommand { let tx_da_indexer = TxDAIndexer::load_from_file( self.order_hash_path.clone(), moveos_store.transaction_store, + rooch_db.rooch_store.clone(), )?; let tx_order = self.order;