diff --git a/crates/rooch/src/commands/da/commands/dump_tx_order_hash.rs b/crates/rooch/src/commands/da/commands/dump_tx_order_hash.rs new file mode 100644 index 0000000000..cf640a96fa --- /dev/null +++ b/crates/rooch/src/commands/da/commands/dump_tx_order_hash.rs @@ -0,0 +1,59 @@ +// Copyright (c) RoochNetwork +// SPDX-License-Identifier: Apache-2.0 + +use crate::commands::da::commands::{LedgerTxGetter, TxOrderHashBlock}; +use rooch_types::error::{RoochError, RoochResult}; +use std::fs::File; +use std::io::BufWriter; +use std::io::Write; +use std::path::PathBuf; + +/// Dump tx_order:tx_hash:block_number to a file from segments +#[derive(Debug, clap::Parser)] +pub struct DumpTxOrderHashCommand { + #[clap(long = "segment-dir")] + pub segment_dir: PathBuf, + #[clap(long = "output")] + pub output: PathBuf, +} + +impl DumpTxOrderHashCommand { + pub fn execute(self) -> RoochResult<()> { + let ledger_tx_loader = LedgerTxGetter::new(self.segment_dir)?; + let mut block_number = ledger_tx_loader.get_min_chunk_id(); + let mut expected_tx_order = 0; + let file = File::create(self.output.clone())?; + let mut writer = BufWriter::with_capacity(8 * 1024 * 1024, file.try_clone().unwrap()); + + loop { + if block_number > ledger_tx_loader.get_max_chunk_id() { + break; + } + let tx_list = ledger_tx_loader.load_ledger_tx_list(block_number, true)?; + let tx_list = tx_list.unwrap(); + for mut ledger_tx in tx_list { + let tx_order = ledger_tx.sequence_info.tx_order; + let tx_hash = ledger_tx.tx_hash(); + if expected_tx_order == 0 { + expected_tx_order = tx_order; + } else if tx_order != expected_tx_order { + return Err(RoochError::from(anyhow::anyhow!( + "tx_order mismatch: expected {}, got {}", + expected_tx_order, + tx_order + ))); + } + writeln!( + writer, + "{}", + TxOrderHashBlock::new(tx_order, tx_hash, block_number) + )?; + expected_tx_order += 1; + } + block_number += 1; + } + writer.flush()?; + file.sync_data()?; + Ok(()) + } +} diff --git a/crates/rooch/src/commands/da/commands/exec.rs b/crates/rooch/src/commands/da/commands/exec.rs index d969a5809c..d0020e4c99 100644 --- a/crates/rooch/src/commands/da/commands/exec.rs +++ b/crates/rooch/src/commands/da/commands/exec.rs @@ -1,7 +1,8 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 -use crate::commands::da::commands::{collect_chunks, get_tx_list_from_chunk}; +use crate::commands::da::commands::{build_rooch_db, LedgerTxGetter, TxOrderHashBlockGetter}; +use anyhow::Context; use bitcoin::hashes::Hash; use bitcoin_client::actor::client::BitcoinClientConfig; use bitcoin_client::proxy::BitcoinClientProxy; @@ -9,12 +10,14 @@ use clap::Parser; use coerce::actor::system::ActorSystem; use coerce::actor::IntoActor; use metrics::RegistryService; -use moveos_store::transaction_store::{TransactionDBStore, TransactionStore}; -use moveos_store::MoveOSStore; +use moveos_common::utils::to_bytes; +use moveos_store::config_store::STARTUP_INFO_KEY; +use moveos_store::{MoveOSStore, CONFIG_STARTUP_INFO_COLUMN_FAMILY_NAME}; use moveos_types::h256::H256; -use moveos_types::moveos_std::object::ObjectMeta; -use moveos_types::transaction::VerifiedMoveOSTransaction; -use rooch_config::RoochOpt; +use moveos_types::startup_info; +use moveos_types::transaction::{TransactionExecutionInfo, VerifiedMoveOSTransaction}; +use raw_store::rocks::batch::WriteBatch; +use raw_store::traits::DBStore; use rooch_config::R_OPT_NET_HELP; use rooch_db::RoochDB; use rooch_executor::actor::executor::ExecutorActor; @@ -34,6 +37,8 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Sender; +use tokio::sync::watch; +use tokio::time; /// exec LedgerTransaction List for verification. #[derive(Debug, Parser)] @@ -64,82 +69,17 @@ pub struct ExecCommand { #[clap(long = "enable-rocks-stats", help = "rocksdb-enable-statistics")] pub enable_rocks_stats: bool, -} - -async fn build_btc_client_proxy( - btc_rpc_url: String, - btc_rpc_user_name: String, - btc_rpc_password: String, - actor_system: &ActorSystem, -) -> anyhow::Result { - let bitcoin_client_config = BitcoinClientConfig { - btc_rpc_url, - btc_rpc_user_name, - btc_rpc_password, - }; - - let bitcoin_client = bitcoin_client_config.build()?; - let bitcoin_client_actor_ref = bitcoin_client - .into_actor(Some("bitcoin_client_for_rpc_service"), actor_system) - .await?; - Ok(BitcoinClientProxy::new(bitcoin_client_actor_ref.into())) -} - -fn build_rooch_db( - base_data_dir: Option, - chain_id: Option, - enable_rocks_stats: bool, -) -> (ObjectMeta, RoochDB) { - let mut opt = RoochOpt::new_with_default(base_data_dir, chain_id, None).unwrap(); - opt.store.enable_statistics = enable_rocks_stats; - let registry_service = RegistryService::default(); - let rooch_db = RoochDB::init(opt.store_config(), ®istry_service.default_registry()).unwrap(); - let root = rooch_db.latest_root().unwrap().unwrap(); - (root, rooch_db) -} - -async fn build_executor_and_store( - base_data_dir: Option, - chain_id: Option, - actor_system: &ActorSystem, - enable_rocks_stats: bool, -) -> anyhow::Result<(ExecutorProxy, MoveOSStore)> { - let registry_service = RegistryService::default(); - - let (root, rooch_db) = - build_rooch_db(base_data_dir.clone(), chain_id.clone(), enable_rocks_stats); - let (rooch_store, moveos_store) = (rooch_db.rooch_store.clone(), rooch_db.moveos_store.clone()); - - let executor_actor = ExecutorActor::new( - root.clone(), - moveos_store.clone(), - rooch_store.clone(), - ®istry_service.default_registry(), - None, - )?; - let executor_actor_ref = executor_actor - .into_actor(Some("Executor"), actor_system) - .await?; - - let reader_executor = ReaderExecutorActor::new( - root.clone(), - moveos_store.clone(), - rooch_store.clone(), - None, - )?; - - let read_executor_ref = reader_executor - .into_actor(Some("ReadExecutor"), actor_system) - .await?; - - Ok(( - ExecutorProxy::new( - executor_actor_ref.clone().into(), - read_executor_ref.clone().into(), - ), - moveos_store, - )) + #[clap( + long = "order-hash-path", + help = "Path to tx_order:tx_hash:block_number file" + )] + pub order_hash_path: PathBuf, + #[clap( + long = "rollback", + help = "rollback to tx order. If not set or ge executed_tx_order, start from executed_tx_order+1(nothing to do); otherwise, rollback to this order." + )] + pub rollback: Option, } impl ExecCommand { @@ -158,7 +98,7 @@ impl ExecCommand { &actor_system, ) .await?; - let (executor, moveos_store) = build_executor_and_store( + let (executor, moveos_store, rooch_db) = build_executor_and_store( self.base_data_dir.clone(), self.chain_id.clone(), &actor_system, @@ -167,19 +107,23 @@ impl ExecCommand { .await?; let (order_state_pair, tx_order_end) = self.load_order_state_pair(); - let (chunks, max_chunk_id) = collect_chunks(self.segment_dir.clone())?; + let ledger_tx_loader = LedgerTxGetter::new(self.segment_dir.clone())?; + let tx_order_hash_block_list = TxOrderHashBlockGetter::load_from_file( + self.order_hash_path.clone(), + moveos_store.transaction_store, + )?; Ok(ExecInner { - segment_dir: self.segment_dir.clone(), - chunks, - max_chunk_id, + ledger_tx_getter: ledger_tx_loader, + tx_order_hash_block_getter: tx_order_hash_block_list, order_state_pair, tx_order_end, bitcoin_client_proxy, executor, - transaction_store: moveos_store.transaction_store, produced: Arc::new(AtomicU64::new(0)), done: Arc::new(AtomicU64::new(0)), executed_tx_order: Arc::new(AtomicU64::new(0)), + rollback: self.rollback, + rooch_db, }) } @@ -204,16 +148,16 @@ impl ExecCommand { } struct ExecInner { - segment_dir: PathBuf, - chunks: HashMap>, - max_chunk_id: u128, + ledger_tx_getter: LedgerTxGetter, + tx_order_hash_block_getter: TxOrderHashBlockGetter, order_state_pair: HashMap, tx_order_end: u64, bitcoin_client_proxy: BitcoinClientProxy, executor: ExecutorProxy, - transaction_store: TransactionDBStore, + rooch_db: RoochDB, + rollback: Option, // stats produced: Arc, @@ -228,36 +172,51 @@ struct ExecMsg { } impl ExecInner { - async fn run(&self) -> anyhow::Result<()> { - let done_clone = self.done.clone(); - let executed_tx_order_clone = self.executed_tx_order.clone(); - let produced_clone = self.produced.clone(); + fn start_logging_task(&self, shutdown_signal: watch::Receiver<()>) { + let done_cloned = self.done.clone(); + let executed_tx_order_cloned = self.executed_tx_order.clone(); + let produced_cloned = self.produced.clone(); + tokio::spawn(async move { + let mut shutdown_signal = shutdown_signal; + + let mut interval = time::interval(Duration::from_secs(60)); + interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip); + loop { - tokio::time::sleep(Duration::from_secs(60)).await; - let done = done_clone.load(std::sync::atomic::Ordering::Relaxed); - let executed_tx_order = - executed_tx_order_clone.load(std::sync::atomic::Ordering::Relaxed); - let produced = produced_clone.load(std::sync::atomic::Ordering::Relaxed); - tracing::info!( - "produced: {}, done: {}, max executed_tx_order: {}", - produced, - done, - executed_tx_order - ); + tokio::select! { + _ = shutdown_signal.changed() => { + tracing::info!("Shutting down logging task."); + break; + } + _ = interval.tick() => { + let done = done_cloned.load(std::sync::atomic::Ordering::Relaxed); + let executed_tx_order = executed_tx_order_cloned.load(std::sync::atomic::Ordering::Relaxed); + let produced = produced_cloned.load(std::sync::atomic::Ordering::Relaxed); + + tracing::info!( + "produced: {}, done: {}, max executed_tx_order: {}", + produced, + done, + executed_tx_order + ); + } + } } }); + } - // larger buffer size to avoid rx starving caused by consumer has to access disks and request btc block. - // after consumer load data(ledger_tx) from disk/btc client, burst to executor, need large buffer to avoid blocking. - // 16384 is a magic number, it's a trade-off between memory usage and performance. (usually tx count inside a block is under 8192, MAX_TXS_PER_BLOCK_IN_FIX) - let (tx, rx) = tokio::sync::mpsc::channel(16384); - let producer = self.produce_tx(tx); - let consumer = self.consume_tx(rx); - + // Joins the producer and consumer, handling results. + async fn join_producer_and_consumer( + &self, + producer: impl std::future::Future>, + consumer: impl std::future::Future>, + ) -> anyhow::Result<()> { let (producer_result, consumer_result) = tokio::join!(producer, consumer); + + // Error handling: Match the producer and consumer results. match (producer_result, consumer_result) { - (Ok(()), Ok(())) => Ok(()), // Both succeeded + (Ok(()), Ok(())) => Ok(()), (Err(producer_err), Ok(())) => Err(producer_err.context("Error in producer")), (Ok(()), Err(consumer_err)) => Err(consumer_err.context("Error in consumer")), (Err(producer_err), Err(consumer_err)) => { @@ -267,70 +226,121 @@ impl ExecInner { } } - fn find_begin_chunk(&self) -> anyhow::Result { - // binary-search from chunk [0, max_chunk_id], find max chunk_id that is finished. - let mut left = 0; - let mut right = self.max_chunk_id; - while left < right { - let mid = left + (right - left) / 2; - if self.is_chunk_finished(mid)? { - left = mid + 1; - } else { - right = mid; - } - } - Ok(left) + async fn run(&self) -> anyhow::Result<()> { + let (shutdown_tx, shutdown_rx) = watch::channel(()); + self.start_logging_task(shutdown_rx); + + // larger buffer size to avoid rx starving caused by consumer has to access disks and request btc block. + // after consumer load data(ledger_tx) from disk/btc client, burst to executor, need large buffer to avoid blocking. + // 16384 is a magic number, it's a trade-off between memory usage and performance. (usually tx count inside a block is under 8192, MAX_TXS_PER_BLOCK_IN_FIX) + let (tx, rx) = tokio::sync::mpsc::channel(16384); + let producer = self.produce_tx(tx); + let consumer = self.consume_tx(rx); + + let result = self.join_producer_and_consumer(producer, consumer).await; + + // Send shutdown signal and ensure logging task exits + let _ = shutdown_tx.send(()); + result } - fn is_chunk_finished(&self, chunk_id: u128) -> anyhow::Result { - let segments = self.chunks.get(&chunk_id); - if segments.is_none() { - return Err(anyhow::anyhow!("chunk: {} not found", chunk_id)); - } - let mut tx_list = get_tx_list_from_chunk( - self.segment_dir.clone(), - chunk_id, - segments.unwrap().clone(), + fn update_startup_info_after_rollback( + &self, + execution_info: TransactionExecutionInfo, + ) -> anyhow::Result<()> { + let rollback_startup_info = + startup_info::StartupInfo::new(execution_info.state_root, execution_info.size); + + let inner_store = &self.rooch_db.rooch_store.store_instance; + let mut write_batch = WriteBatch::new(); + let cf_names = vec![CONFIG_STARTUP_INFO_COLUMN_FAMILY_NAME]; + + write_batch.put( + to_bytes(STARTUP_INFO_KEY).unwrap(), + to_bytes(&rollback_startup_info).unwrap(), )?; - let last_tx_in_chunk = tx_list - .last_mut() - .unwrap_or_else(|| panic!("chunk: {} tx_list is empty", chunk_id)); - let last_tx_hash = last_tx_in_chunk.tx_hash(); - self.is_tx_executed(last_tx_hash) - } - fn is_tx_executed(&self, tx_hash: H256) -> anyhow::Result { - let execution_info = self.transaction_store.get_tx_execution_info(tx_hash)?; - Ok(execution_info.is_some()) + inner_store.write_batch_across_cfs(cf_names, write_batch, true) } async fn produce_tx(&self, tx: Sender) -> anyhow::Result<()> { - let mut block_number = self.find_begin_chunk()?; + let last_executed_opt = self.tx_order_hash_block_getter.find_last_executed()?; + let mut next_tx_order = last_executed_opt + .clone() + .map(|v| v.tx_order + 1) + .unwrap_or(1); + let mut next_block_number = last_executed_opt + .clone() + .map(|v| v.block_number + 1) + .unwrap_or(0); + tracing::info!( + "next_tx_order: {:?}. need rollback soon: {:?}", + next_tx_order, + self.rollback.is_some() + ); + + // If rollback not set or ge executed_tx_order, start from executed_tx_order+1(nothing to do); otherwise, rollback to this order + if let (Some(rollback), Some(last_executed)) = (self.rollback, last_executed_opt.clone()) { + let last_executed_tx_order = last_executed.tx_order; + if rollback < last_executed_tx_order { + let new_last_and_rollback = self + .tx_order_hash_block_getter + .slice(rollback, last_executed_tx_order)?; + // split into two parts, the first get execution info for new startup, all others rollback + let (new_last, rollback_part) = new_last_and_rollback.split_first().unwrap(); + tracing::info!( + "Start to rollback transactions tx_order: [{}, {}]", + rollback_part.first().unwrap().tx_order, + rollback_part.last().unwrap().tx_order, + ); + for need_revert in rollback_part.iter() { + self.rooch_db + .revert_tx_unsafe(need_revert.tx_order, need_revert.tx_hash) + .map_err(|err| { + anyhow::anyhow!( + "Error reverting transaction {}: {:?}", + need_revert.tx_order, + err + ) + })?; + } + let rollback_execution_info = self + .tx_order_hash_block_getter + .get_execution_info(new_last.tx_hash)?; + self.update_startup_info_after_rollback(rollback_execution_info.unwrap())?; + next_block_number = new_last.block_number; + next_tx_order = rollback + 1; + tracing::info!("Rollback transactions done",); + } + }; - tracing::info!("Start to produce transactions from block: {}", block_number); + tracing::info!( + "Start to produce transactions from tx_order: {}, check from block: {}", + next_tx_order, + next_block_number, + ); let mut produced_tx_order = 0; - let mut executed = true; + let mut reach_end = false; loop { - let tx_list = self.load_ledger_tx_list(block_number)?; + if reach_end { + break; + } + let tx_list = self + .ledger_tx_getter + .load_ledger_tx_list(next_block_number, false)?; if tx_list.is_none() { - block_number -= 1; // no chunk belongs to this block_number + next_block_number -= 1; // no chunk belongs to this block_number break; } let tx_list = tx_list.unwrap(); - for mut ledger_tx in tx_list { + for ledger_tx in tx_list { let tx_order = ledger_tx.sequence_info.tx_order; if tx_order > self.tx_order_end { + reach_end = true; break; } - if executed { - let execution_info = self - .transaction_store - .get_tx_execution_info(ledger_tx.data.tx_hash())?; - if execution_info.is_some() { - continue; - } - tracing::info!("tx_order: {} is not executed, begin at here", tx_order); - executed = false; + if tx_order < next_tx_order { + continue; } let l1_block_with_body = match &ledger_tx.data { @@ -354,11 +364,11 @@ impl ExecInner { self.produced .fetch_add(1, std::sync::atomic::Ordering::Relaxed); } - block_number += 1; + next_block_number += 1; } tracing::info!( "All transactions are produced, max_block_number: {}, max_tx_order: {}", - block_number, + next_block_number, produced_tx_order ); Ok(()) @@ -376,7 +386,12 @@ impl ExecInner { let exec_msg = exec_msg_opt.unwrap(); let tx_order = exec_msg.tx_order; - self.execute(exec_msg).await?; + self.execute(exec_msg).await.with_context(|| { + format!( + "Error executing transaction: tx_order: {}, executed_tx_order: {}", + tx_order, executed_tx_order + ) + })?; executed_tx_order = tx_order; self.executed_tx_order @@ -402,22 +417,6 @@ impl ExecInner { Ok(()) } - fn load_ledger_tx_list( - &self, - block_number: u128, - ) -> anyhow::Result>> { - let segments = self.chunks.get(&block_number); - if segments.is_none() { - return Ok(None); - } - let tx_list = get_tx_list_from_chunk( - self.segment_dir.clone(), - block_number, - segments.unwrap().clone(), - )?; - Ok(Some(tx_list)) - } - async fn execute(&self, msg: ExecMsg) -> anyhow::Result<()> { let ExecMsg { tx_order, @@ -454,7 +453,7 @@ impl ExecInner { ) -> anyhow::Result<()> { let executor = self.executor.clone(); - let (_output, execution_info) = executor.execute_transaction(moveos_tx.clone()).await?; + let (output, execution_info) = executor.execute_transaction(moveos_tx.clone()).await?; let root = execution_info.root_metadata(); let expected_root_opt = self.order_state_pair.get(&tx_order); @@ -462,9 +461,9 @@ impl ExecInner { Some(expected_root) => { if root.state_root.unwrap() != *expected_root { return Err(anyhow::anyhow!( - "Execution state root is not equal to RoochNetwork: tx_order: {}, exp: {:?}, act: {:?}", + "Execution state root is not equal to RoochNetwork: tx_order: {}, exp: {:?}, act: {:?}; act_changeset: {:?}", tx_order, - *expected_root, root.state_root.unwrap() + *expected_root, root.state_root.unwrap(), output.changeset )); } tracing::info!( @@ -477,3 +476,67 @@ impl ExecInner { } } } + +async fn build_btc_client_proxy( + btc_rpc_url: String, + btc_rpc_user_name: String, + btc_rpc_password: String, + actor_system: &ActorSystem, +) -> anyhow::Result { + let bitcoin_client_config = BitcoinClientConfig { + btc_rpc_url, + btc_rpc_user_name, + btc_rpc_password, + }; + + let bitcoin_client = bitcoin_client_config.build()?; + let bitcoin_client_actor_ref = bitcoin_client + .into_actor(Some("bitcoin_client_for_rpc_service"), actor_system) + .await?; + Ok(BitcoinClientProxy::new(bitcoin_client_actor_ref.into())) +} + +async fn build_executor_and_store( + base_data_dir: Option, + chain_id: Option, + actor_system: &ActorSystem, + enable_rocks_stats: bool, +) -> anyhow::Result<(ExecutorProxy, MoveOSStore, RoochDB)> { + let registry_service = RegistryService::default(); + + let (root, rooch_db) = + build_rooch_db(base_data_dir.clone(), chain_id.clone(), enable_rocks_stats); + let (rooch_store, moveos_store) = (rooch_db.rooch_store.clone(), rooch_db.moveos_store.clone()); + + let executor_actor = ExecutorActor::new( + root.clone(), + moveos_store.clone(), + rooch_store.clone(), + ®istry_service.default_registry(), + None, + )?; + + let executor_actor_ref = executor_actor + .into_actor(Some("Executor"), actor_system) + .await?; + + let reader_executor = ReaderExecutorActor::new( + root.clone(), + moveos_store.clone(), + rooch_store.clone(), + None, + )?; + + let read_executor_ref = reader_executor + .into_actor(Some("ReadExecutor"), actor_system) + .await?; + + Ok(( + ExecutorProxy::new( + executor_actor_ref.clone().into(), + read_executor_ref.clone().into(), + ), + moveos_store, + rooch_db, + )) +} diff --git a/crates/rooch/src/commands/da/commands/mod.rs b/crates/rooch/src/commands/da/commands/mod.rs index e952890cc8..85fec98f3a 100644 --- a/crates/rooch/src/commands/da/commands/mod.rs +++ b/crates/rooch/src/commands/da/commands/mod.rs @@ -1,13 +1,24 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 +use metrics::RegistryService; +use moveos_store::transaction_store::{TransactionDBStore, TransactionStore}; +use moveos_types::h256::H256; +use moveos_types::moveos_std::object::ObjectMeta; +use moveos_types::transaction::TransactionExecutionInfo; +use rooch_config::RoochOpt; +use rooch_db::RoochDB; use rooch_types::da::chunk::chunk_from_segments; use rooch_types::da::segment::{segment_from_bytes, SegmentID}; +use rooch_types::rooch_network::RoochChainID; use rooch_types::transaction::LedgerTransaction; use std::collections::HashMap; use std::fs; +use std::fs::File; +use std::io::{BufRead, BufReader, Read}; use std::path::PathBuf; +pub mod dump_tx_order_hash; pub mod exec; pub mod namespace; pub mod unpack; @@ -18,10 +29,11 @@ pub mod unpack; // we collect all the chunks and their segment numbers to unpack them later. pub(crate) fn collect_chunks( segment_dir: PathBuf, -) -> anyhow::Result<(HashMap>, u128)> { +) -> anyhow::Result<(HashMap>, u128, u128)> { let mut chunks = HashMap::new(); let mut max_chunk_id = 0; - for entry in fs::read_dir(segment_dir)?.flatten() { + let mut min_chunk_id = u128::MAX; + for entry in fs::read_dir(segment_dir.clone())?.flatten() { let path = entry.path(); if path.is_file() { if let Some(segment_id) = path @@ -35,10 +47,16 @@ pub(crate) fn collect_chunks( if chunk_id > max_chunk_id { max_chunk_id = chunk_id; } + if chunk_id < min_chunk_id { + min_chunk_id = chunk_id; + } } } } - Ok((chunks, max_chunk_id)) + if chunks.is_empty() { + return Err(anyhow::anyhow!("No segment found in {:?}", segment_dir)); + } + Ok((chunks, min_chunk_id, max_chunk_id)) } pub(crate) fn get_tx_list_from_chunk( @@ -62,3 +80,197 @@ pub(crate) fn get_tx_list_from_chunk( batch.verify(true)?; Ok(batch.get_tx_list()) } + +pub(crate) fn build_rooch_db( + base_data_dir: Option, + chain_id: Option, + enable_rocks_stats: bool, +) -> (ObjectMeta, RoochDB) { + let mut opt = RoochOpt::new_with_default(base_data_dir, chain_id, None).unwrap(); + opt.store.enable_statistics = enable_rocks_stats; + let registry_service = RegistryService::default(); + let rooch_db = RoochDB::init(opt.store_config(), ®istry_service.default_registry()).unwrap(); + let root = rooch_db.latest_root().unwrap().unwrap(); + (root, rooch_db) +} + +pub struct LedgerTxGetter { + segment_dir: PathBuf, + chunks: HashMap>, + min_chunk_id: u128, + max_chunk_id: u128, +} + +impl LedgerTxGetter { + pub fn new(segment_dir: PathBuf) -> anyhow::Result { + let (chunks, min_chunk_id, max_chunk_id) = collect_chunks(segment_dir.clone())?; + + Ok(LedgerTxGetter { + segment_dir, + chunks, + min_chunk_id, + max_chunk_id, + }) + } + + pub fn load_ledger_tx_list( + &self, + chunk_id: u128, + must_has: bool, + ) -> anyhow::Result>> { + let segments = self.chunks.get(&chunk_id); + if segments.is_none() { + if must_has { + return Err(anyhow::anyhow!("No segment found in chunk {}", chunk_id)); + } + return Ok(None); + } + let tx_list = get_tx_list_from_chunk( + self.segment_dir.clone(), + chunk_id, + segments.unwrap().clone(), + )?; + Ok(Some(tx_list)) + } + + pub fn get_max_chunk_id(&self) -> u128 { + self.max_chunk_id + } + + pub fn get_min_chunk_id(&self) -> u128 { + self.min_chunk_id + } +} + +#[derive(Debug, Clone)] +pub struct TxOrderHashBlock { + pub tx_order: u64, + pub tx_hash: H256, + pub block_number: u128, +} + +impl TxOrderHashBlock { + pub fn new(tx_order: u64, tx_hash: H256, block_number: u128) -> Self { + TxOrderHashBlock { + tx_order, + tx_hash, + block_number, + } + } +} + +impl std::fmt::Display for TxOrderHashBlock { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}:{:?}:{}", + self.tx_order, self.tx_hash, self.block_number + ) + } +} + +impl std::str::FromStr for TxOrderHashBlock { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let parts: Vec<&str> = s.split(':').collect(); + if parts.len() != 3 { + return Err(anyhow::anyhow!("Invalid format")); + } + let tx_order = parts[0].parse::()?; + let tx_hash = H256::from_str(parts[1])?; + let block_number = parts[2].parse::()?; + Ok(TxOrderHashBlock { + tx_order, + tx_hash, + block_number, + }) + } +} + +pub struct TxOrderHashBlockGetter { + tx_order_hash_blocks: Vec, + transaction_store: TransactionDBStore, +} + +impl TxOrderHashBlockGetter { + pub fn load_from_file( + file_path: PathBuf, + transaction_store: TransactionDBStore, + ) -> anyhow::Result { + let mut tx_order_hashes = Vec::with_capacity(70000000); + let mut reader = BufReader::new(File::open(file_path)?); + for line in reader.by_ref().lines() { + let line = line?; + let item = line.parse::()?; + tx_order_hashes.push(item); + } + Ok(TxOrderHashBlockGetter { + tx_order_hash_blocks: tx_order_hashes, + transaction_store, + }) + } + + pub fn slice( + &self, + start_tx_order: u64, + end_tx_order: u64, + ) -> anyhow::Result> { + let r = self + .tx_order_hash_blocks + .binary_search_by(|x| x.tx_order.cmp(&start_tx_order)); + let start_idx = match r { + Ok(i) => i, + Err(_) => { + return Err(anyhow::anyhow!("start_tx_order not found")); + } + }; + let end_idx = start_idx + (end_tx_order - start_tx_order) as usize; + Ok(self.tx_order_hash_blocks[start_idx..end_idx + 1].to_vec()) + } + + pub fn find_last_executed(&self) -> anyhow::Result> { + // Check for an empty list + if self.tx_order_hash_blocks.is_empty() { + return Ok(None); + } + + // Binary search + let mut left = 0; + let mut right = self.tx_order_hash_blocks.len() - 1; + while left < right { + let mid = (left + right) / 2; + let tx_order_hash_block = &self.tx_order_hash_blocks[mid]; + let executed = self.has_executed(tx_order_hash_block.tx_hash)?; + if executed { + left = mid + 1; + } else { + right = mid; + } + } + + // Determine result + let last_executed = self.has_executed(self.tx_order_hash_blocks[left].tx_hash)?; + if left == 0 && !last_executed { + return Ok(None); + } + if !last_executed { + Ok(Some(self.tx_order_hash_blocks[left - 1].clone())) + } else { + Ok(Some(self.tx_order_hash_blocks[left].clone())) + } + } + + pub fn has_executed(&self, tx_hash: H256) -> anyhow::Result { + let execution_info = self.transaction_store.get_tx_execution_info(tx_hash)?; + Ok(execution_info.is_some()) + } + + pub fn get_execution_info( + &self, + tx_hash: H256, + ) -> anyhow::Result> { + let execution_info = self.transaction_store.get_tx_execution_info(tx_hash)?; + Ok(execution_info) + } +} diff --git a/crates/rooch/src/commands/da/commands/unpack.rs b/crates/rooch/src/commands/da/commands/unpack.rs index 5b985fd75d..fbbc4fffdd 100644 --- a/crates/rooch/src/commands/da/commands/unpack.rs +++ b/crates/rooch/src/commands/da/commands/unpack.rs @@ -118,7 +118,7 @@ impl UnpackInner { } fn collect_chunks(&mut self) -> anyhow::Result<()> { - let (chunks, _max_chunk_id) = collect_chunks(self.segment_dir.clone())?; + let (chunks, _min_chunk_id, _max_chunk_id) = collect_chunks(self.segment_dir.clone())?; self.chunks = chunks; Ok(()) } diff --git a/crates/rooch/src/commands/da/mod.rs b/crates/rooch/src/commands/da/mod.rs index 9faa8e0c8c..a97a5a342c 100644 --- a/crates/rooch/src/commands/da/mod.rs +++ b/crates/rooch/src/commands/da/mod.rs @@ -4,6 +4,7 @@ pub mod commands; use crate::cli_types::CommandAction; +use crate::commands::da::commands::dump_tx_order_hash::DumpTxOrderHashCommand; use crate::commands::da::commands::exec::ExecCommand; use crate::commands::da::commands::namespace::NamespaceCommand; use crate::commands::da::commands::unpack::UnpackCommand; @@ -25,6 +26,9 @@ impl CommandAction for DA { DACommand::Unpack(unpack) => unpack.execute().map(|_| "".to_owned()), DACommand::Namespace(namespace) => namespace.execute().map(|_| "".to_owned()), DACommand::Exec(exec) => exec.execute().await.map(|_| "".to_owned()), + DACommand::DumpTxOrderHash(dump_tx_order_hash) => { + dump_tx_order_hash.execute().map(|_| "".to_owned()) + } } } } @@ -35,4 +39,5 @@ pub enum DACommand { Unpack(UnpackCommand), Namespace(NamespaceCommand), Exec(ExecCommand), + DumpTxOrderHash(DumpTxOrderHashCommand), } diff --git a/crates/rooch/src/commands/db/commands/rollback.rs b/crates/rooch/src/commands/db/commands/rollback.rs index af7da7a592..abe0ebde72 100644 --- a/crates/rooch/src/commands/db/commands/rollback.rs +++ b/crates/rooch/src/commands/db/commands/rollback.rs @@ -118,9 +118,9 @@ impl RollbackCommand { ); // tx_hash lost: // 1. rollback incomplete cause last_order not updated - // 2. database inconsistent (use other method to check/repair) + // 2. the database is inconsistent (use another method to check/repair) // - // it's okay to continue rollback, after revert all txs, the last_order will be updated later + // it's okay to continue rollback, after reverting all txs; the last_order will be updated later continue; } let tx_hash = tx_hashes[0].unwrap();