From 06aeab3fae59f5eae1a0021c549673342adfc633 Mon Sep 17 00:00:00 2001 From: popcnt1 Date: Wed, 18 Dec 2024 22:33:22 +0800 Subject: [PATCH 1/5] feat(rooch-da): implement transaction order hash command Add `get_tx_order_hash` command to process transaction order and hash data. Introduced `LedgerTxLoader` for loading and verifying ledger transactions, and updated DACommand to integrate the new functionality. --- .../commands/da/commands/get_tx_order_hash.rs | 61 +++++++++++++++ crates/rooch/src/commands/da/commands/mod.rs | 76 ++++++++++++++++++- crates/rooch/src/commands/da/mod.rs | 5 ++ 3 files changed, 139 insertions(+), 3 deletions(-) create mode 100644 crates/rooch/src/commands/da/commands/get_tx_order_hash.rs diff --git a/crates/rooch/src/commands/da/commands/get_tx_order_hash.rs b/crates/rooch/src/commands/da/commands/get_tx_order_hash.rs new file mode 100644 index 0000000000..004987885e --- /dev/null +++ b/crates/rooch/src/commands/da/commands/get_tx_order_hash.rs @@ -0,0 +1,61 @@ +// Copyright (c) RoochNetwork +// SPDX-License-Identifier: Apache-2.0 + +use crate::commands::da::commands::LedgerTxLoader; +use async_trait::async_trait; +use bitcoin::io::Write; +use rooch_types::error::RoochResult; +use rooch_types::transaction::{L1BlockWithBody, LedgerTxData}; +use std::fs::File; +use std::io::BufWriter; +use std::path::PathBuf; + +/// Get transactions by hashes +#[derive(Debug, clap::Parser)] +pub struct GetTxOrderHashCommand { + #[clap(long = "segment-dir")] + pub segment_dir: PathBuf, + #[clap(long = "output")] + pub output: PathBuf, +} + +#[async_trait] +impl GetTxOrderHashCommand { + pub fn execute(self) -> RoochResult<()> { + let ledger_tx_loader = LedgerTxLoader::new(self.segment_dir)?; + let mut block_number = ledger_tx_loader.get_min_chunk_id(); + let mut expected_tx_order = 0; + let file = File::create(self.output.clone())?; + let mut writer = BufWriter::with_capacity(8 * 1024 * 1024, file.try_clone().unwrap()); + + loop { + let tx_list = ledger_tx_loader.load_ledger_tx_list(block_number)?; + if tx_list.is_none() { + break; + } + let tx_list = tx_list.unwrap(); + for mut ledger_tx in tx_list { + let tx_order = ledger_tx.sequence_info.tx_order; + let tx_hash = ledger_tx.tx_hash(); + if expected_tx_order == 0 { + expected_tx_order = tx_order; + } else { + if tx_order != expected_tx_order { + tracing::error!( + "Tx order not expected, expected: {}, actual: {}, tx_hash: {}", + expected_tx_order, + tx_order, + tx_hash + ); + } + expected_tx_order += 1; + } + writeln!(writer, "{}:{:?}", tx_order, tx_hash)?; + } + block_number += 1; + } + writer.flush()?; + file.sync_data()?; + Ok(()) + } +} diff --git a/crates/rooch/src/commands/da/commands/mod.rs b/crates/rooch/src/commands/da/commands/mod.rs index e952890cc8..d15995981d 100644 --- a/crates/rooch/src/commands/da/commands/mod.rs +++ b/crates/rooch/src/commands/da/commands/mod.rs @@ -1,14 +1,20 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 +use metrics::RegistryService; +use moveos_types::moveos_std::object::ObjectMeta; +use rooch_config::RoochOpt; +use rooch_db::RoochDB; use rooch_types::da::chunk::chunk_from_segments; use rooch_types::da::segment::{segment_from_bytes, SegmentID}; +use rooch_types::rooch_network::RoochChainID; use rooch_types::transaction::LedgerTransaction; use std::collections::HashMap; use std::fs; use std::path::PathBuf; pub mod exec; +pub mod get_tx_order_hash; pub mod namespace; pub mod unpack; @@ -18,10 +24,11 @@ pub mod unpack; // we collect all the chunks and their segment numbers to unpack them later. pub(crate) fn collect_chunks( segment_dir: PathBuf, -) -> anyhow::Result<(HashMap>, u128)> { +) -> anyhow::Result<(HashMap>, u128, u128)> { let mut chunks = HashMap::new(); let mut max_chunk_id = 0; - for entry in fs::read_dir(segment_dir)?.flatten() { + let mut min_chunk_id = u128::MAX; + for entry in fs::read_dir(segment_dir.clone())?.flatten() { let path = entry.path(); if path.is_file() { if let Some(segment_id) = path @@ -35,10 +42,16 @@ pub(crate) fn collect_chunks( if chunk_id > max_chunk_id { max_chunk_id = chunk_id; } + if chunk_id < min_chunk_id { + min_chunk_id = chunk_id; + } } } } - Ok((chunks, max_chunk_id)) + if chunks.is_empty() { + return Err(anyhow::anyhow!("No segment found in {:?}", segment_dir)); + } + Ok((chunks, min_chunk_id, max_chunk_id)) } pub(crate) fn get_tx_list_from_chunk( @@ -62,3 +75,60 @@ pub(crate) fn get_tx_list_from_chunk( batch.verify(true)?; Ok(batch.get_tx_list()) } + +pub(crate) fn build_rooch_db( + base_data_dir: Option, + chain_id: Option, + enable_rocks_stats: bool, +) -> (ObjectMeta, RoochDB) { + let mut opt = RoochOpt::new_with_default(base_data_dir, chain_id, None).unwrap(); + opt.store.enable_statistics = enable_rocks_stats; + let registry_service = RegistryService::default(); + let rooch_db = RoochDB::init(opt.store_config(), ®istry_service.default_registry()).unwrap(); + let root = rooch_db.latest_root().unwrap().unwrap(); + (root, rooch_db) +} + +pub struct LedgerTxLoader { + segment_dir: PathBuf, + chunks: HashMap>, + min_chunk_id: u128, + max_chunk_id: u128, +} + +impl LedgerTxLoader { + pub fn new(segment_dir: PathBuf) -> anyhow::Result { + let (chunks, min_chunk_id, max_chunk_id) = collect_chunks(segment_dir.clone())?; + + Ok(LedgerTxLoader { + segment_dir, + chunks, + min_chunk_id, + max_chunk_id, + }) + } + + pub fn load_ledger_tx_list( + &self, + block_number: u128, + ) -> anyhow::Result>> { + let segments = self.chunks.get(&block_number); + if segments.is_none() { + return Ok(None); + } + let tx_list = get_tx_list_from_chunk( + self.segment_dir.clone(), + block_number, + segments.unwrap().clone(), + )?; + Ok(Some(tx_list)) + } + + pub fn get_max_chunk_id(&self) -> u128 { + self.max_chunk_id + } + + pub fn get_min_chunk_id(&self) -> u128 { + self.min_chunk_id + } +} diff --git a/crates/rooch/src/commands/da/mod.rs b/crates/rooch/src/commands/da/mod.rs index 9faa8e0c8c..2e1e0b5545 100644 --- a/crates/rooch/src/commands/da/mod.rs +++ b/crates/rooch/src/commands/da/mod.rs @@ -5,6 +5,7 @@ pub mod commands; use crate::cli_types::CommandAction; use crate::commands::da::commands::exec::ExecCommand; +use crate::commands::da::commands::get_tx_order_hash::GetTxOrderHashCommand; use crate::commands::da::commands::namespace::NamespaceCommand; use crate::commands::da::commands::unpack::UnpackCommand; use async_trait::async_trait; @@ -25,6 +26,9 @@ impl CommandAction for DA { DACommand::Unpack(unpack) => unpack.execute().map(|_| "".to_owned()), DACommand::Namespace(namespace) => namespace.execute().map(|_| "".to_owned()), DACommand::Exec(exec) => exec.execute().await.map(|_| "".to_owned()), + DACommand::GetTxOrderHash(get_tx_order_hash) => { + get_tx_order_hash.execute().map(|_| "".to_owned()) + } } } } @@ -35,4 +39,5 @@ pub enum DACommand { Unpack(UnpackCommand), Namespace(NamespaceCommand), Exec(ExecCommand), + GetTxOrderHash(GetTxOrderHashCommand), } From 424462bca187db73ecadc1af9e0bb09f11131aec Mon Sep 17 00:00:00 2001 From: popcnt1 Date: Thu, 19 Dec 2024 05:22:18 +0800 Subject: [PATCH 2/5] feat(rooch-da): enhance transaction processing in exec command and and rollback Introduce new transaction and ledger handling with rollback and logging features. This includes TxOrderHashBlockGetter for advanced binary search and rollback management, improved logging with shutdown handling, and restructuring for clearer chunk processing and execution. --- ...tx_order_hash.rs => dump_tx_order_hash.rs} | 38 +- crates/rooch/src/commands/da/commands/exec.rs | 427 ++++++++++-------- crates/rooch/src/commands/da/commands/mod.rs | 146 +++++- .../rooch/src/commands/da/commands/unpack.rs | 2 +- crates/rooch/src/commands/da/mod.rs | 8 +- .../src/commands/db/commands/rollback.rs | 4 +- 6 files changed, 410 insertions(+), 215 deletions(-) rename crates/rooch/src/commands/da/commands/{get_tx_order_hash.rs => dump_tx_order_hash.rs} (60%) diff --git a/crates/rooch/src/commands/da/commands/get_tx_order_hash.rs b/crates/rooch/src/commands/da/commands/dump_tx_order_hash.rs similarity index 60% rename from crates/rooch/src/commands/da/commands/get_tx_order_hash.rs rename to crates/rooch/src/commands/da/commands/dump_tx_order_hash.rs index 004987885e..60b102efdd 100644 --- a/crates/rooch/src/commands/da/commands/get_tx_order_hash.rs +++ b/crates/rooch/src/commands/da/commands/dump_tx_order_hash.rs @@ -1,38 +1,35 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 -use crate::commands::da::commands::LedgerTxLoader; -use async_trait::async_trait; -use bitcoin::io::Write; -use rooch_types::error::RoochResult; -use rooch_types::transaction::{L1BlockWithBody, LedgerTxData}; +use crate::commands::da::commands::{LedgerTxGetter, TxOrderHashBlock}; +use rooch_types::error::{RoochError, RoochResult}; use std::fs::File; use std::io::BufWriter; +use std::io::Write; use std::path::PathBuf; -/// Get transactions by hashes +/// Dump tx_order:tx_hash:block_number to a file from segments #[derive(Debug, clap::Parser)] -pub struct GetTxOrderHashCommand { +pub struct DumpTxOrderHashCommand { #[clap(long = "segment-dir")] pub segment_dir: PathBuf, #[clap(long = "output")] pub output: PathBuf, } -#[async_trait] -impl GetTxOrderHashCommand { +impl DumpTxOrderHashCommand { pub fn execute(self) -> RoochResult<()> { - let ledger_tx_loader = LedgerTxLoader::new(self.segment_dir)?; + let ledger_tx_loader = LedgerTxGetter::new(self.segment_dir)?; let mut block_number = ledger_tx_loader.get_min_chunk_id(); let mut expected_tx_order = 0; let file = File::create(self.output.clone())?; let mut writer = BufWriter::with_capacity(8 * 1024 * 1024, file.try_clone().unwrap()); loop { - let tx_list = ledger_tx_loader.load_ledger_tx_list(block_number)?; - if tx_list.is_none() { + if block_number > ledger_tx_loader.get_max_chunk_id() { break; } + let tx_list = ledger_tx_loader.load_ledger_tx_list(block_number, true)?; let tx_list = tx_list.unwrap(); for mut ledger_tx in tx_list { let tx_order = ledger_tx.sequence_info.tx_order; @@ -41,16 +38,19 @@ impl GetTxOrderHashCommand { expected_tx_order = tx_order; } else { if tx_order != expected_tx_order { - tracing::error!( - "Tx order not expected, expected: {}, actual: {}, tx_hash: {}", + return Err(RoochError::from(anyhow::anyhow!( + "tx_order mismatch: expected {}, got {}", expected_tx_order, - tx_order, - tx_hash - ); + tx_order + ))); } - expected_tx_order += 1; } - writeln!(writer, "{}:{:?}", tx_order, tx_hash)?; + writeln!( + writer, + "{}", + TxOrderHashBlock::new(tx_order, tx_hash, block_number).to_string() + )?; + expected_tx_order += 1; } block_number += 1; } diff --git a/crates/rooch/src/commands/da/commands/exec.rs b/crates/rooch/src/commands/da/commands/exec.rs index d969a5809c..2aa4b5a2ed 100644 --- a/crates/rooch/src/commands/da/commands/exec.rs +++ b/crates/rooch/src/commands/da/commands/exec.rs @@ -1,7 +1,8 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 -use crate::commands::da::commands::{collect_chunks, get_tx_list_from_chunk}; +use crate::commands::da::commands::{build_rooch_db, LedgerTxGetter, TxOrderHashBlockGetter}; +use anyhow::Context; use bitcoin::hashes::Hash; use bitcoin_client::actor::client::BitcoinClientConfig; use bitcoin_client::proxy::BitcoinClientProxy; @@ -9,12 +10,14 @@ use clap::Parser; use coerce::actor::system::ActorSystem; use coerce::actor::IntoActor; use metrics::RegistryService; -use moveos_store::transaction_store::{TransactionDBStore, TransactionStore}; -use moveos_store::MoveOSStore; +use moveos_common::utils::to_bytes; +use moveos_store::config_store::STARTUP_INFO_KEY; +use moveos_store::{MoveOSStore, CONFIG_STARTUP_INFO_COLUMN_FAMILY_NAME}; use moveos_types::h256::H256; -use moveos_types::moveos_std::object::ObjectMeta; -use moveos_types::transaction::VerifiedMoveOSTransaction; -use rooch_config::RoochOpt; +use moveos_types::startup_info; +use moveos_types::transaction::{TransactionExecutionInfo, VerifiedMoveOSTransaction}; +use raw_store::rocks::batch::WriteBatch; +use raw_store::traits::DBStore; use rooch_config::R_OPT_NET_HELP; use rooch_db::RoochDB; use rooch_executor::actor::executor::ExecutorActor; @@ -34,6 +37,8 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Sender; +use tokio::sync::watch; +use tokio::time; /// exec LedgerTransaction List for verification. #[derive(Debug, Parser)] @@ -64,82 +69,17 @@ pub struct ExecCommand { #[clap(long = "enable-rocks-stats", help = "rocksdb-enable-statistics")] pub enable_rocks_stats: bool, -} - -async fn build_btc_client_proxy( - btc_rpc_url: String, - btc_rpc_user_name: String, - btc_rpc_password: String, - actor_system: &ActorSystem, -) -> anyhow::Result { - let bitcoin_client_config = BitcoinClientConfig { - btc_rpc_url, - btc_rpc_user_name, - btc_rpc_password, - }; - - let bitcoin_client = bitcoin_client_config.build()?; - let bitcoin_client_actor_ref = bitcoin_client - .into_actor(Some("bitcoin_client_for_rpc_service"), actor_system) - .await?; - Ok(BitcoinClientProxy::new(bitcoin_client_actor_ref.into())) -} - -fn build_rooch_db( - base_data_dir: Option, - chain_id: Option, - enable_rocks_stats: bool, -) -> (ObjectMeta, RoochDB) { - let mut opt = RoochOpt::new_with_default(base_data_dir, chain_id, None).unwrap(); - opt.store.enable_statistics = enable_rocks_stats; - let registry_service = RegistryService::default(); - let rooch_db = RoochDB::init(opt.store_config(), ®istry_service.default_registry()).unwrap(); - let root = rooch_db.latest_root().unwrap().unwrap(); - (root, rooch_db) -} - -async fn build_executor_and_store( - base_data_dir: Option, - chain_id: Option, - actor_system: &ActorSystem, - enable_rocks_stats: bool, -) -> anyhow::Result<(ExecutorProxy, MoveOSStore)> { - let registry_service = RegistryService::default(); - - let (root, rooch_db) = - build_rooch_db(base_data_dir.clone(), chain_id.clone(), enable_rocks_stats); - let (rooch_store, moveos_store) = (rooch_db.rooch_store.clone(), rooch_db.moveos_store.clone()); - - let executor_actor = ExecutorActor::new( - root.clone(), - moveos_store.clone(), - rooch_store.clone(), - ®istry_service.default_registry(), - None, - )?; - - let executor_actor_ref = executor_actor - .into_actor(Some("Executor"), actor_system) - .await?; - - let reader_executor = ReaderExecutorActor::new( - root.clone(), - moveos_store.clone(), - rooch_store.clone(), - None, - )?; - - let read_executor_ref = reader_executor - .into_actor(Some("ReadExecutor"), actor_system) - .await?; - Ok(( - ExecutorProxy::new( - executor_actor_ref.clone().into(), - read_executor_ref.clone().into(), - ), - moveos_store, - )) + #[clap( + long = "order-hash-path", + help = "Path to tx_order:tx_hash:block_number file" + )] + pub order_hash_path: PathBuf, + #[clap( + long = "rollback", + help = "rollback to tx order. If not set or ge executed_tx_order, start from executed_tx_order+1(nothing to do); otherwise, rollback to this order." + )] + pub rollback: Option, } impl ExecCommand { @@ -158,7 +98,7 @@ impl ExecCommand { &actor_system, ) .await?; - let (executor, moveos_store) = build_executor_and_store( + let (executor, moveos_store, rooch_db) = build_executor_and_store( self.base_data_dir.clone(), self.chain_id.clone(), &actor_system, @@ -167,19 +107,23 @@ impl ExecCommand { .await?; let (order_state_pair, tx_order_end) = self.load_order_state_pair(); - let (chunks, max_chunk_id) = collect_chunks(self.segment_dir.clone())?; + let ledger_tx_loader = LedgerTxGetter::new(self.segment_dir.clone())?; + let tx_order_hash_block_list = TxOrderHashBlockGetter::load_from_file( + self.order_hash_path.clone(), + moveos_store.transaction_store, + )?; Ok(ExecInner { - segment_dir: self.segment_dir.clone(), - chunks, - max_chunk_id, + ledger_tx_getter: ledger_tx_loader, + tx_order_hash_block_getter: tx_order_hash_block_list, order_state_pair, tx_order_end, bitcoin_client_proxy, executor, - transaction_store: moveos_store.transaction_store, produced: Arc::new(AtomicU64::new(0)), done: Arc::new(AtomicU64::new(0)), executed_tx_order: Arc::new(AtomicU64::new(0)), + rollback: self.rollback, + rooch_db, }) } @@ -204,16 +148,16 @@ impl ExecCommand { } struct ExecInner { - segment_dir: PathBuf, - chunks: HashMap>, - max_chunk_id: u128, + ledger_tx_getter: LedgerTxGetter, + tx_order_hash_block_getter: TxOrderHashBlockGetter, order_state_pair: HashMap, tx_order_end: u64, bitcoin_client_proxy: BitcoinClientProxy, executor: ExecutorProxy, - transaction_store: TransactionDBStore, + rooch_db: RoochDB, + rollback: Option, // stats produced: Arc, @@ -228,36 +172,51 @@ struct ExecMsg { } impl ExecInner { - async fn run(&self) -> anyhow::Result<()> { - let done_clone = self.done.clone(); - let executed_tx_order_clone = self.executed_tx_order.clone(); - let produced_clone = self.produced.clone(); + fn start_logging_task(&self, shutdown_signal: watch::Receiver<()>) { + let done_cloned = self.done.clone(); + let executed_tx_order_cloned = self.executed_tx_order.clone(); + let produced_cloned = self.produced.clone(); + tokio::spawn(async move { + let mut shutdown_signal = shutdown_signal; + + let mut interval = time::interval(Duration::from_secs(60)); + interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip); + loop { - tokio::time::sleep(Duration::from_secs(60)).await; - let done = done_clone.load(std::sync::atomic::Ordering::Relaxed); - let executed_tx_order = - executed_tx_order_clone.load(std::sync::atomic::Ordering::Relaxed); - let produced = produced_clone.load(std::sync::atomic::Ordering::Relaxed); - tracing::info!( - "produced: {}, done: {}, max executed_tx_order: {}", - produced, - done, - executed_tx_order - ); + tokio::select! { + _ = shutdown_signal.changed() => { + tracing::info!("Shutting down logging task."); + break; + } + _ = interval.tick() => { + let done = done_cloned.load(std::sync::atomic::Ordering::Relaxed); + let executed_tx_order = executed_tx_order_cloned.load(std::sync::atomic::Ordering::Relaxed); + let produced = produced_cloned.load(std::sync::atomic::Ordering::Relaxed); + + tracing::info!( + "produced: {}, done: {}, max executed_tx_order: {}", + produced, + done, + executed_tx_order + ); + } + } } }); + } - // larger buffer size to avoid rx starving caused by consumer has to access disks and request btc block. - // after consumer load data(ledger_tx) from disk/btc client, burst to executor, need large buffer to avoid blocking. - // 16384 is a magic number, it's a trade-off between memory usage and performance. (usually tx count inside a block is under 8192, MAX_TXS_PER_BLOCK_IN_FIX) - let (tx, rx) = tokio::sync::mpsc::channel(16384); - let producer = self.produce_tx(tx); - let consumer = self.consume_tx(rx); - + // Joins the producer and consumer, handling results. + async fn join_producer_and_consumer( + &self, + producer: impl std::future::Future>, + consumer: impl std::future::Future>, + ) -> anyhow::Result<()> { let (producer_result, consumer_result) = tokio::join!(producer, consumer); + + // Error handling: Match the producer and consumer results. match (producer_result, consumer_result) { - (Ok(()), Ok(())) => Ok(()), // Both succeeded + (Ok(()), Ok(())) => Ok(()), (Err(producer_err), Ok(())) => Err(producer_err.context("Error in producer")), (Ok(()), Err(consumer_err)) => Err(consumer_err.context("Error in consumer")), (Err(producer_err), Err(consumer_err)) => { @@ -267,70 +226,121 @@ impl ExecInner { } } - fn find_begin_chunk(&self) -> anyhow::Result { - // binary-search from chunk [0, max_chunk_id], find max chunk_id that is finished. - let mut left = 0; - let mut right = self.max_chunk_id; - while left < right { - let mid = left + (right - left) / 2; - if self.is_chunk_finished(mid)? { - left = mid + 1; - } else { - right = mid; - } - } - Ok(left) + async fn run(&self) -> anyhow::Result<()> { + let (shutdown_tx, shutdown_rx) = watch::channel(()); + self.start_logging_task(shutdown_rx); + + // larger buffer size to avoid rx starving caused by consumer has to access disks and request btc block. + // after consumer load data(ledger_tx) from disk/btc client, burst to executor, need large buffer to avoid blocking. + // 16384 is a magic number, it's a trade-off between memory usage and performance. (usually tx count inside a block is under 8192, MAX_TXS_PER_BLOCK_IN_FIX) + let (tx, rx) = tokio::sync::mpsc::channel(2); + let producer = self.produce_tx(tx); + let consumer = self.consume_tx(rx); + + let result = self.join_producer_and_consumer(producer, consumer).await; + + // Send shutdown signal and ensure logging task exits + let _ = shutdown_tx.send(()); + result } - fn is_chunk_finished(&self, chunk_id: u128) -> anyhow::Result { - let segments = self.chunks.get(&chunk_id); - if segments.is_none() { - return Err(anyhow::anyhow!("chunk: {} not found", chunk_id)); - } - let mut tx_list = get_tx_list_from_chunk( - self.segment_dir.clone(), - chunk_id, - segments.unwrap().clone(), + fn update_startup_info_after_rollback( + &self, + execution_info: TransactionExecutionInfo, + ) -> anyhow::Result<()> { + let rollback_startup_info = + startup_info::StartupInfo::new(execution_info.state_root, execution_info.size); + + let inner_store = &self.rooch_db.rooch_store.store_instance; + let mut write_batch = WriteBatch::new(); + let cf_names = vec![CONFIG_STARTUP_INFO_COLUMN_FAMILY_NAME]; + + write_batch.put( + to_bytes(STARTUP_INFO_KEY).unwrap(), + to_bytes(&rollback_startup_info).unwrap(), )?; - let last_tx_in_chunk = tx_list - .last_mut() - .unwrap_or_else(|| panic!("chunk: {} tx_list is empty", chunk_id)); - let last_tx_hash = last_tx_in_chunk.tx_hash(); - self.is_tx_executed(last_tx_hash) - } - fn is_tx_executed(&self, tx_hash: H256) -> anyhow::Result { - let execution_info = self.transaction_store.get_tx_execution_info(tx_hash)?; - Ok(execution_info.is_some()) + inner_store.write_batch_across_cfs(cf_names, write_batch, true) } async fn produce_tx(&self, tx: Sender) -> anyhow::Result<()> { - let mut block_number = self.find_begin_chunk()?; + let last_executed_opt = self.tx_order_hash_block_getter.find_last_executed()?; + let mut next_tx_order = last_executed_opt + .clone() + .map(|v| v.tx_order + 1) + .unwrap_or(1); + let mut next_block_number = last_executed_opt + .clone() + .map(|v| v.block_number + 1) + .unwrap_or(0); + tracing::info!( + "next_tx_order: {:?}. need rollback soon: {:?}", + next_tx_order, + self.rollback.is_some() + ); + + // If rollback not set or ge executed_tx_order, start from executed_tx_order+1(nothing to do); otherwise, rollback to this order + if let (Some(rollback), Some(last_executed)) = (self.rollback, last_executed_opt.clone()) { + let last_executed_tx_order = last_executed.tx_order; + if rollback < last_executed_tx_order { + let new_last_and_rollback = self + .tx_order_hash_block_getter + .slice(rollback, last_executed_tx_order)?; + // split into two parts, the first get execution info for new startup, all others rollback + let (new_last, rollback_part) = new_last_and_rollback.split_first().unwrap(); + tracing::info!( + "Start to rollback transactions tx_order: [{}, {}]", + rollback_part.first().unwrap().tx_order, + rollback_part.last().unwrap().tx_order, + ); + for need_revert in rollback_part.iter() { + self.rooch_db + .revert_tx_unsafe(need_revert.tx_order, need_revert.tx_hash) + .map_err(|err| { + anyhow::anyhow!( + "Error reverting transaction {}: {:?}", + need_revert.tx_order, + err + ) + })?; + } + let rollback_execution_info = self + .tx_order_hash_block_getter + .get_execution_info(new_last.tx_hash)?; + self.update_startup_info_after_rollback(rollback_execution_info.unwrap())?; + next_block_number = new_last.block_number; + next_tx_order = rollback + 1; + tracing::info!("Rollback transactions done",); + } + }; - tracing::info!("Start to produce transactions from block: {}", block_number); + tracing::info!( + "Start to produce transactions from tx_order: {}, check from block: {}", + next_tx_order, + next_block_number, + ); let mut produced_tx_order = 0; - let mut executed = true; + let mut reach_end = false; loop { - let tx_list = self.load_ledger_tx_list(block_number)?; + if reach_end { + break; + } + let tx_list = self + .ledger_tx_getter + .load_ledger_tx_list(next_block_number, false)?; if tx_list.is_none() { - block_number -= 1; // no chunk belongs to this block_number + next_block_number -= 1; // no chunk belongs to this block_number break; } let tx_list = tx_list.unwrap(); - for mut ledger_tx in tx_list { + for ledger_tx in tx_list { let tx_order = ledger_tx.sequence_info.tx_order; if tx_order > self.tx_order_end { + reach_end = true; break; } - if executed { - let execution_info = self - .transaction_store - .get_tx_execution_info(ledger_tx.data.tx_hash())?; - if execution_info.is_some() { - continue; - } - tracing::info!("tx_order: {} is not executed, begin at here", tx_order); - executed = false; + if tx_order < next_tx_order { + continue; } let l1_block_with_body = match &ledger_tx.data { @@ -354,11 +364,11 @@ impl ExecInner { self.produced .fetch_add(1, std::sync::atomic::Ordering::Relaxed); } - block_number += 1; + next_block_number += 1; } tracing::info!( "All transactions are produced, max_block_number: {}, max_tx_order: {}", - block_number, + next_block_number, produced_tx_order ); Ok(()) @@ -376,7 +386,12 @@ impl ExecInner { let exec_msg = exec_msg_opt.unwrap(); let tx_order = exec_msg.tx_order; - self.execute(exec_msg).await?; + self.execute(exec_msg).await.with_context(|| { + format!( + "Error executing transaction: tx_order: {}, executed_tx_order: {}", + tx_order, executed_tx_order + ) + })?; executed_tx_order = tx_order; self.executed_tx_order @@ -402,22 +417,6 @@ impl ExecInner { Ok(()) } - fn load_ledger_tx_list( - &self, - block_number: u128, - ) -> anyhow::Result>> { - let segments = self.chunks.get(&block_number); - if segments.is_none() { - return Ok(None); - } - let tx_list = get_tx_list_from_chunk( - self.segment_dir.clone(), - block_number, - segments.unwrap().clone(), - )?; - Ok(Some(tx_list)) - } - async fn execute(&self, msg: ExecMsg) -> anyhow::Result<()> { let ExecMsg { tx_order, @@ -454,7 +453,7 @@ impl ExecInner { ) -> anyhow::Result<()> { let executor = self.executor.clone(); - let (_output, execution_info) = executor.execute_transaction(moveos_tx.clone()).await?; + let (output, execution_info) = executor.execute_transaction(moveos_tx.clone()).await?; let root = execution_info.root_metadata(); let expected_root_opt = self.order_state_pair.get(&tx_order); @@ -462,9 +461,9 @@ impl ExecInner { Some(expected_root) => { if root.state_root.unwrap() != *expected_root { return Err(anyhow::anyhow!( - "Execution state root is not equal to RoochNetwork: tx_order: {}, exp: {:?}, act: {:?}", + "Execution state root is not equal to RoochNetwork: tx_order: {}, exp: {:?}, act: {:?}; act_changeset: {:?}", tx_order, - *expected_root, root.state_root.unwrap() + *expected_root, root.state_root.unwrap(), output.changeset )); } tracing::info!( @@ -477,3 +476,67 @@ impl ExecInner { } } } + +async fn build_btc_client_proxy( + btc_rpc_url: String, + btc_rpc_user_name: String, + btc_rpc_password: String, + actor_system: &ActorSystem, +) -> anyhow::Result { + let bitcoin_client_config = BitcoinClientConfig { + btc_rpc_url, + btc_rpc_user_name, + btc_rpc_password, + }; + + let bitcoin_client = bitcoin_client_config.build()?; + let bitcoin_client_actor_ref = bitcoin_client + .into_actor(Some("bitcoin_client_for_rpc_service"), actor_system) + .await?; + Ok(BitcoinClientProxy::new(bitcoin_client_actor_ref.into())) +} + +async fn build_executor_and_store( + base_data_dir: Option, + chain_id: Option, + actor_system: &ActorSystem, + enable_rocks_stats: bool, +) -> anyhow::Result<(ExecutorProxy, MoveOSStore, RoochDB)> { + let registry_service = RegistryService::default(); + + let (root, rooch_db) = + build_rooch_db(base_data_dir.clone(), chain_id.clone(), enable_rocks_stats); + let (rooch_store, moveos_store) = (rooch_db.rooch_store.clone(), rooch_db.moveos_store.clone()); + + let executor_actor = ExecutorActor::new( + root.clone(), + moveos_store.clone(), + rooch_store.clone(), + ®istry_service.default_registry(), + None, + )?; + + let executor_actor_ref = executor_actor + .into_actor(Some("Executor"), actor_system) + .await?; + + let reader_executor = ReaderExecutorActor::new( + root.clone(), + moveos_store.clone(), + rooch_store.clone(), + None, + )?; + + let read_executor_ref = reader_executor + .into_actor(Some("ReadExecutor"), actor_system) + .await?; + + Ok(( + ExecutorProxy::new( + executor_actor_ref.clone().into(), + read_executor_ref.clone().into(), + ), + moveos_store, + rooch_db, + )) +} diff --git a/crates/rooch/src/commands/da/commands/mod.rs b/crates/rooch/src/commands/da/commands/mod.rs index d15995981d..84d739bb86 100644 --- a/crates/rooch/src/commands/da/commands/mod.rs +++ b/crates/rooch/src/commands/da/commands/mod.rs @@ -2,7 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 use metrics::RegistryService; +use moveos_store::transaction_store::{TransactionDBStore, TransactionStore}; +use moveos_types::h256::H256; use moveos_types::moveos_std::object::ObjectMeta; +use moveos_types::transaction::TransactionExecutionInfo; use rooch_config::RoochOpt; use rooch_db::RoochDB; use rooch_types::da::chunk::chunk_from_segments; @@ -11,10 +14,12 @@ use rooch_types::rooch_network::RoochChainID; use rooch_types::transaction::LedgerTransaction; use std::collections::HashMap; use std::fs; +use std::fs::File; +use std::io::{BufRead, BufReader, Read}; use std::path::PathBuf; +pub mod dump_tx_order_hash; pub mod exec; -pub mod get_tx_order_hash; pub mod namespace; pub mod unpack; @@ -89,18 +94,18 @@ pub(crate) fn build_rooch_db( (root, rooch_db) } -pub struct LedgerTxLoader { +pub struct LedgerTxGetter { segment_dir: PathBuf, chunks: HashMap>, min_chunk_id: u128, max_chunk_id: u128, } -impl LedgerTxLoader { +impl LedgerTxGetter { pub fn new(segment_dir: PathBuf) -> anyhow::Result { let (chunks, min_chunk_id, max_chunk_id) = collect_chunks(segment_dir.clone())?; - Ok(LedgerTxLoader { + Ok(LedgerTxGetter { segment_dir, chunks, min_chunk_id, @@ -110,15 +115,19 @@ impl LedgerTxLoader { pub fn load_ledger_tx_list( &self, - block_number: u128, + chunk_id: u128, + must_has: bool, ) -> anyhow::Result>> { - let segments = self.chunks.get(&block_number); + let segments = self.chunks.get(&chunk_id); if segments.is_none() { + if must_has { + return Err(anyhow::anyhow!("No segment found in chunk {}", chunk_id)); + } return Ok(None); } let tx_list = get_tx_list_from_chunk( self.segment_dir.clone(), - block_number, + chunk_id, segments.unwrap().clone(), )?; Ok(Some(tx_list)) @@ -132,3 +141,126 @@ impl LedgerTxLoader { self.min_chunk_id } } + +#[derive(Debug, Clone)] +pub struct TxOrderHashBlock { + pub tx_order: u64, + pub tx_hash: H256, + pub block_number: u128, +} + +impl TxOrderHashBlock { + pub fn new(tx_order: u64, tx_hash: H256, block_number: u128) -> Self { + TxOrderHashBlock { + tx_order, + tx_hash, + block_number, + } + } +} + +impl std::fmt::Display for TxOrderHashBlock { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}:{:?}:{}", + self.tx_order, self.tx_hash, self.block_number + ) + } +} + +impl std::str::FromStr for TxOrderHashBlock { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let parts: Vec<&str> = s.split(':').collect(); + if parts.len() != 3 { + return Err(anyhow::anyhow!("Invalid format")); + } + let tx_order = parts[0].parse::()?; + let tx_hash = H256::from_str(parts[1])?; + let block_number = parts[2].parse::()?; + Ok(TxOrderHashBlock { + tx_order, + tx_hash, + block_number, + }) + } +} + +pub struct TxOrderHashBlockGetter { + tx_order_hash_blocks: Vec, + transaction_store: TransactionDBStore, +} + +impl TxOrderHashBlockGetter { + pub fn load_from_file( + file_path: PathBuf, + transaction_store: TransactionDBStore, + ) -> anyhow::Result { + let mut tx_order_hashes = Vec::with_capacity(70000000); + let mut reader = BufReader::new(File::open(file_path)?); + for line in reader.by_ref().lines() { + let line = line?; + let item = line.parse::()?; + tx_order_hashes.push(item); + } + Ok(TxOrderHashBlockGetter { + tx_order_hash_blocks: tx_order_hashes, + transaction_store, + }) + } + + pub fn slice( + &self, + start_tx_order: u64, + end_tx_order: u64, + ) -> anyhow::Result> { + let r = self + .tx_order_hash_blocks + .binary_search_by(|x| x.tx_order.cmp(&(start_tx_order as u64))); + let start_idx = match r { + Ok(i) => i, + Err(_) => { + return Err(anyhow::anyhow!("start_tx_order not found")); + } + }; + let end_idx = start_idx + (end_tx_order - start_tx_order) as usize; + Ok(self.tx_order_hash_blocks[start_idx..end_idx + 1].to_vec()) + } + + pub fn find_last_executed(&self) -> anyhow::Result> { + // binary search + let mut left = 0; + let mut right = self.tx_order_hash_blocks.len() - 1; + while left < right { + let mid = (left + right) / 2; + let tx_order_hash_block = &self.tx_order_hash_blocks[mid]; + let tx_hash = tx_order_hash_block.tx_hash; + let executed = self.has_executed(tx_hash)?; + if executed { + left = mid + 1; + } else { + right = mid; + } + } + if self.has_executed(self.tx_order_hash_blocks[left].tx_hash)? { + Ok(None) + } else { + Ok(Some(self.tx_order_hash_blocks[left].clone())) + } + } + + pub fn has_executed(&self, tx_hash: H256) -> anyhow::Result { + let execution_info = self.transaction_store.get_tx_execution_info(tx_hash)?; + Ok(execution_info.is_some()) + } + + pub fn get_execution_info( + &self, + tx_hash: H256, + ) -> anyhow::Result> { + let execution_info = self.transaction_store.get_tx_execution_info(tx_hash)?; + Ok(execution_info) + } +} diff --git a/crates/rooch/src/commands/da/commands/unpack.rs b/crates/rooch/src/commands/da/commands/unpack.rs index 5b985fd75d..fbbc4fffdd 100644 --- a/crates/rooch/src/commands/da/commands/unpack.rs +++ b/crates/rooch/src/commands/da/commands/unpack.rs @@ -118,7 +118,7 @@ impl UnpackInner { } fn collect_chunks(&mut self) -> anyhow::Result<()> { - let (chunks, _max_chunk_id) = collect_chunks(self.segment_dir.clone())?; + let (chunks, _min_chunk_id, _max_chunk_id) = collect_chunks(self.segment_dir.clone())?; self.chunks = chunks; Ok(()) } diff --git a/crates/rooch/src/commands/da/mod.rs b/crates/rooch/src/commands/da/mod.rs index 2e1e0b5545..a97a5a342c 100644 --- a/crates/rooch/src/commands/da/mod.rs +++ b/crates/rooch/src/commands/da/mod.rs @@ -4,8 +4,8 @@ pub mod commands; use crate::cli_types::CommandAction; +use crate::commands::da::commands::dump_tx_order_hash::DumpTxOrderHashCommand; use crate::commands::da::commands::exec::ExecCommand; -use crate::commands::da::commands::get_tx_order_hash::GetTxOrderHashCommand; use crate::commands::da::commands::namespace::NamespaceCommand; use crate::commands::da::commands::unpack::UnpackCommand; use async_trait::async_trait; @@ -26,8 +26,8 @@ impl CommandAction for DA { DACommand::Unpack(unpack) => unpack.execute().map(|_| "".to_owned()), DACommand::Namespace(namespace) => namespace.execute().map(|_| "".to_owned()), DACommand::Exec(exec) => exec.execute().await.map(|_| "".to_owned()), - DACommand::GetTxOrderHash(get_tx_order_hash) => { - get_tx_order_hash.execute().map(|_| "".to_owned()) + DACommand::DumpTxOrderHash(dump_tx_order_hash) => { + dump_tx_order_hash.execute().map(|_| "".to_owned()) } } } @@ -39,5 +39,5 @@ pub enum DACommand { Unpack(UnpackCommand), Namespace(NamespaceCommand), Exec(ExecCommand), - GetTxOrderHash(GetTxOrderHashCommand), + DumpTxOrderHash(DumpTxOrderHashCommand), } diff --git a/crates/rooch/src/commands/db/commands/rollback.rs b/crates/rooch/src/commands/db/commands/rollback.rs index af7da7a592..a463ee7aa9 100644 --- a/crates/rooch/src/commands/db/commands/rollback.rs +++ b/crates/rooch/src/commands/db/commands/rollback.rs @@ -118,9 +118,9 @@ impl RollbackCommand { ); // tx_hash lost: // 1. rollback incomplete cause last_order not updated - // 2. database inconsistent (use other method to check/repair) + // 2. database is inconsistent (use another method to check/repair) // - // it's okay to continue rollback, after revert all txs, the last_order will be updated later + // it's okay to continue rollback, after reverting all txs; the last_order will be updated later continue; } let tx_hash = tx_hashes[0].unwrap(); From 4eb0a9d3cb550881a44fcdad80520447a0db6fef Mon Sep 17 00:00:00 2001 From: popcnt1 Date: Thu, 19 Dec 2024 06:08:58 +0800 Subject: [PATCH 3/5] fix(rooch-da): improve buffer size and fix binary search issues Increase mpsc channel buffer size to 16384 for performance improvement and resolve edge cases in binary search for finding the last executed transaction. Ensures better handling of empty lists and accurate last executed transaction detection. --- crates/rooch/src/commands/da/commands/exec.rs | 2 +- crates/rooch/src/commands/da/commands/mod.rs | 20 ++++++++++++++----- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/crates/rooch/src/commands/da/commands/exec.rs b/crates/rooch/src/commands/da/commands/exec.rs index 2aa4b5a2ed..d0020e4c99 100644 --- a/crates/rooch/src/commands/da/commands/exec.rs +++ b/crates/rooch/src/commands/da/commands/exec.rs @@ -233,7 +233,7 @@ impl ExecInner { // larger buffer size to avoid rx starving caused by consumer has to access disks and request btc block. // after consumer load data(ledger_tx) from disk/btc client, burst to executor, need large buffer to avoid blocking. // 16384 is a magic number, it's a trade-off between memory usage and performance. (usually tx count inside a block is under 8192, MAX_TXS_PER_BLOCK_IN_FIX) - let (tx, rx) = tokio::sync::mpsc::channel(2); + let (tx, rx) = tokio::sync::mpsc::channel(16384); let producer = self.produce_tx(tx); let consumer = self.consume_tx(rx); diff --git a/crates/rooch/src/commands/da/commands/mod.rs b/crates/rooch/src/commands/da/commands/mod.rs index 84d739bb86..5caa9c4be2 100644 --- a/crates/rooch/src/commands/da/commands/mod.rs +++ b/crates/rooch/src/commands/da/commands/mod.rs @@ -230,22 +230,32 @@ impl TxOrderHashBlockGetter { } pub fn find_last_executed(&self) -> anyhow::Result> { - // binary search + // Check for an empty list + if self.tx_order_hash_blocks.is_empty() { + return Ok(None); + } + + // Binary search let mut left = 0; let mut right = self.tx_order_hash_blocks.len() - 1; while left < right { let mid = (left + right) / 2; let tx_order_hash_block = &self.tx_order_hash_blocks[mid]; - let tx_hash = tx_order_hash_block.tx_hash; - let executed = self.has_executed(tx_hash)?; + let executed = self.has_executed(tx_order_hash_block.tx_hash)?; if executed { left = mid + 1; } else { right = mid; } } - if self.has_executed(self.tx_order_hash_blocks[left].tx_hash)? { - Ok(None) + + // Determine result + let last_executed = self.has_executed(self.tx_order_hash_blocks[left].tx_hash)?; + if left == 0 && !last_executed { + return Ok(None); + } + if !last_executed { + Ok(Some(self.tx_order_hash_blocks[left - 1].clone())) } else { Ok(Some(self.tx_order_hash_blocks[left].clone())) } From 0c879cb4e628b46daeebb1962d06548c1a7e5507 Mon Sep 17 00:00:00 2001 From: popcnt1 Date: Thu, 19 Dec 2024 06:14:49 +0800 Subject: [PATCH 4/5] fix(rooch-da): improve tx_order comparison and error handling Simplifies the `binary_search_by` logic and refines error handling for `tx_order` mismatch. Ensures alignment with expected behavior and improves code readability. --- .../commands/da/commands/dump_tx_order_hash.rs | 16 +++++++--------- crates/rooch/src/commands/da/commands/mod.rs | 2 +- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/crates/rooch/src/commands/da/commands/dump_tx_order_hash.rs b/crates/rooch/src/commands/da/commands/dump_tx_order_hash.rs index 60b102efdd..cf640a96fa 100644 --- a/crates/rooch/src/commands/da/commands/dump_tx_order_hash.rs +++ b/crates/rooch/src/commands/da/commands/dump_tx_order_hash.rs @@ -36,19 +36,17 @@ impl DumpTxOrderHashCommand { let tx_hash = ledger_tx.tx_hash(); if expected_tx_order == 0 { expected_tx_order = tx_order; - } else { - if tx_order != expected_tx_order { - return Err(RoochError::from(anyhow::anyhow!( - "tx_order mismatch: expected {}, got {}", - expected_tx_order, - tx_order - ))); - } + } else if tx_order != expected_tx_order { + return Err(RoochError::from(anyhow::anyhow!( + "tx_order mismatch: expected {}, got {}", + expected_tx_order, + tx_order + ))); } writeln!( writer, "{}", - TxOrderHashBlock::new(tx_order, tx_hash, block_number).to_string() + TxOrderHashBlock::new(tx_order, tx_hash, block_number) )?; expected_tx_order += 1; } diff --git a/crates/rooch/src/commands/da/commands/mod.rs b/crates/rooch/src/commands/da/commands/mod.rs index 5caa9c4be2..85fec98f3a 100644 --- a/crates/rooch/src/commands/da/commands/mod.rs +++ b/crates/rooch/src/commands/da/commands/mod.rs @@ -218,7 +218,7 @@ impl TxOrderHashBlockGetter { ) -> anyhow::Result> { let r = self .tx_order_hash_blocks - .binary_search_by(|x| x.tx_order.cmp(&(start_tx_order as u64))); + .binary_search_by(|x| x.tx_order.cmp(&start_tx_order)); let start_idx = match r { Ok(i) => i, Err(_) => { From 04fffbefafaabbddc1ac9bac791aa0478817c212 Mon Sep 17 00:00:00 2001 From: popcnt <142196625+popcnt1@users.noreply.github.com> Date: Thu, 19 Dec 2024 12:40:39 +0800 Subject: [PATCH 5/5] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- crates/rooch/src/commands/db/commands/rollback.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/rooch/src/commands/db/commands/rollback.rs b/crates/rooch/src/commands/db/commands/rollback.rs index a463ee7aa9..abe0ebde72 100644 --- a/crates/rooch/src/commands/db/commands/rollback.rs +++ b/crates/rooch/src/commands/db/commands/rollback.rs @@ -118,7 +118,7 @@ impl RollbackCommand { ); // tx_hash lost: // 1. rollback incomplete cause last_order not updated - // 2. database is inconsistent (use another method to check/repair) + // 2. the database is inconsistent (use another method to check/repair) // // it's okay to continue rollback, after reverting all txs; the last_order will be updated later continue;