diff --git a/Cargo.lock b/Cargo.lock index 6aa0e67971..de649aa99c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10062,9 +10062,11 @@ dependencies = [ "async-trait", "bcs", "bitcoin 0.32.3", + "bitcoin-client", "ciborium", "clap 4.5.17", "codespan-reporting", + "coerce", "csv", "datatest-stable 0.1.1", "fastcrypto 0.1.8 (git+https://github.com/MystenLabs/fastcrypto?rev=56f6223b84ada922b6cb2c672c69db2ea3dc6a13)", @@ -10106,6 +10108,7 @@ dependencies = [ "rooch-common", "rooch-config", "rooch-db", + "rooch-executor", "rooch-faucet", "rooch-framework", "rooch-genesis", diff --git a/crates/rooch-genesis/src/lib.rs b/crates/rooch-genesis/src/lib.rs index 93e2428dbc..c8a8b1de07 100644 --- a/crates/rooch-genesis/src/lib.rs +++ b/crates/rooch-genesis/src/lib.rs @@ -402,11 +402,6 @@ impl RoochGenesis { "Genesis output mismatch" ); - let tx_hash = self.genesis_tx().tx_hash(); - let (output, genesis_execution_info) = rooch_db - .moveos_store - .handle_tx_output(tx_hash, genesis_raw_output.clone())?; - // Save the genesis txs to sequencer let genesis_tx_order: u64 = 0; let moveos_genesis_context = self @@ -440,6 +435,11 @@ impl RoochGenesis { genesis_accumulator_unsaved_nodes, )?; + let tx_hash = self.genesis_tx().tx_hash(); + let (output, genesis_execution_info) = rooch_db + .moveos_store + .handle_tx_output(tx_hash, genesis_raw_output.clone())?; + // Save genesis tx state change set let state_change_set_ext = StateChangeSetExt::new( output.changeset.clone(), diff --git a/crates/rooch-pipeline-processor/src/actor/processor.rs b/crates/rooch-pipeline-processor/src/actor/processor.rs index 12681cb099..b9d893a498 100644 --- a/crates/rooch-pipeline-processor/src/actor/processor.rs +++ b/crates/rooch-pipeline-processor/src/actor/processor.rs @@ -135,7 +135,7 @@ impl PipelineProcessorActor { } None => { return Err(anyhow::anyhow!( - "The bitcoin client proxy should be initialized before processing the sequenced l1_block_tx(block: {:?} on startup", block + "The bitcoin client proxy should be initialized before processing the sequenced l1_block_tx(block: {:?}) on startup", block )); } } diff --git a/crates/rooch-rpc-server/src/lib.rs b/crates/rooch-rpc-server/src/lib.rs index 758ee00a47..ae0670b117 100644 --- a/crates/rooch-rpc-server/src/lib.rs +++ b/crates/rooch-rpc-server/src/lib.rs @@ -237,7 +237,7 @@ pub async fn run_start_server(opt: RoochOpt, server_opt: ServerOpt) -> Result Result, + + /// If local chainid, start the service with a temporary data store. + /// All data will be deleted when the service is stopped. + #[clap(long, short = 'n', help = R_OPT_NET_HELP)] + pub chain_id: Option, + + #[clap(long = "btc-rpc-url")] + pub btc_rpc_url: String, + #[clap(long = "btc-rpc-user-name")] + pub btc_rpc_user_name: String, + #[clap(long = "btc-rpc-password")] + pub btc_rpc_password: String, +} + +async fn build_btc_client_proxy( + btc_rpc_url: String, + btc_rpc_user_name: String, + btc_rpc_password: String, + actor_system: &ActorSystem, +) -> anyhow::Result { + let bitcoin_client_config = BitcoinClientConfig { + btc_rpc_url, + btc_rpc_user_name, + btc_rpc_password, + }; + + let bitcoin_client = bitcoin_client_config.build()?; + let bitcoin_client_actor_ref = bitcoin_client + .into_actor(Some("bitcoin_client_for_rpc_service"), actor_system) + .await?; + Ok(BitcoinClientProxy::new(bitcoin_client_actor_ref.into())) +} + +fn build_rooch_db( + base_data_dir: Option, + chain_id: Option, +) -> (ObjectMeta, RoochDB) { + let opt = RoochOpt::new_with_default(base_data_dir, chain_id, None).unwrap(); + let registry_service = RegistryService::default(); + let rooch_db = RoochDB::init(opt.store_config(), ®istry_service.default_registry()).unwrap(); + let root = rooch_db.latest_root().unwrap().unwrap(); + (root, rooch_db) +} + +async fn build_executor_and_store( + base_data_dir: Option, + chain_id: Option, + actor_system: &ActorSystem, +) -> anyhow::Result<(ExecutorProxy, MoveOSStore)> { + let registry_service = RegistryService::default(); + + let (root, rooch_db) = build_rooch_db(base_data_dir.clone(), chain_id.clone()); + let (rooch_store, moveos_store) = (rooch_db.rooch_store.clone(), rooch_db.moveos_store.clone()); + + let executor_actor = ExecutorActor::new( + root.clone(), + moveos_store.clone(), + rooch_store.clone(), + ®istry_service.default_registry(), + None, + )?; + + let executor_actor_ref = executor_actor + .into_actor(Some("Executor"), actor_system) + .await?; + + let reader_executor = ReaderExecutorActor::new( + root.clone(), + moveos_store.clone(), + rooch_store.clone(), + None, + )?; + + let read_executor_ref = reader_executor + .into_actor(Some("ReadExecutor"), actor_system) + .await?; + + Ok(( + ExecutorProxy::new( + executor_actor_ref.clone().into(), + read_executor_ref.clone().into(), + ), + moveos_store, + )) +} + +impl ExecCommand { + pub async fn execute(self) -> RoochResult<()> { + let exec_inner = self.build_exec_inner().await?; + exec_inner.run().await?; + Ok(()) + } + + async fn build_exec_inner(&self) -> anyhow::Result { + let actor_system = ActorSystem::global_system(); + let bitcoin_client_proxy = build_btc_client_proxy( + self.btc_rpc_url.clone(), + self.btc_rpc_user_name.clone(), + self.btc_rpc_password.clone(), + &actor_system, + ) + .await?; + let (executor, moveos_store) = build_executor_and_store( + self.base_data_dir.clone(), + self.chain_id.clone(), + &actor_system, + ) + .await?; + + let (order_state_pair, tx_order_end) = self.load_order_state_pair(); + let chunks = collect_chunks(self.segment_dir.clone())?; + Ok(ExecInner { + segment_dir: self.segment_dir.clone(), + chunks, + order_state_pair, + tx_order_end, + bitcoin_client_proxy, + executor, + transaction_store: moveos_store.transaction_store, + produced: Arc::new(AtomicU64::new(0)), + done: Arc::new(AtomicU64::new(0)), + verified_tx_order: Arc::new(AtomicU64::new(0)), + }) + } + + fn load_order_state_pair(&self) -> (HashMap, u64) { + let mut order_state_pair = HashMap::new(); + let mut tx_order_end = 0; + + let mut reader = BufReader::new(File::open(self.order_state_path.clone()).unwrap()); + // collect all `tx_order:state_root` pairs + for line in reader.by_ref().lines() { + let line = line.unwrap(); + let parts: Vec<&str> = line.split(':').collect(); + let tx_order = parts[0].parse::().unwrap(); + let state_root = H256::from_str(parts[1]).unwrap(); + order_state_pair.insert(tx_order, state_root); + if tx_order > tx_order_end { + tx_order_end = tx_order; + } + } + (order_state_pair, tx_order_end) + } +} + +struct ExecInner { + segment_dir: PathBuf, + chunks: HashMap>, + order_state_pair: HashMap, + tx_order_end: u64, + + bitcoin_client_proxy: BitcoinClientProxy, + executor: ExecutorProxy, + + transaction_store: TransactionDBStore, + + // stats + produced: Arc, + done: Arc, + verified_tx_order: Arc, +} + +struct ExecMsg { + tx_order: u64, + ledger_tx: LedgerTransaction, + l1_block_with_body: Option, +} + +impl ExecInner { + async fn run(&self) -> anyhow::Result<()> { + let done_clone = self.done.clone(); + let verified_tx_order_clone = self.verified_tx_order.clone(); + let produced_clone = self.produced.clone(); + tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(60)).await; + let done = done_clone.load(std::sync::atomic::Ordering::Relaxed); + let verified_tx_order = + verified_tx_order_clone.load(std::sync::atomic::Ordering::Relaxed); + let produced = produced_clone.load(std::sync::atomic::Ordering::Relaxed); + tracing::info!( + "produced: {}, done: {}, verified_tx_order: {}", + produced, + done, + verified_tx_order + ); + } + }); + + let (tx, rx) = tokio::sync::mpsc::channel(16); + let producer = self.produce_tx(tx); + let consumer = self.consume_tx(rx); + + let (producer_result, consumer_result) = tokio::join!(producer, consumer); + match (producer_result, consumer_result) { + (Ok(()), Ok(())) => Ok(()), // Both succeeded + (Err(producer_err), Ok(())) => Err(producer_err.context("Error in producer")), + (Ok(()), Err(consumer_err)) => Err(consumer_err.context("Error in consumer")), + (Err(producer_err), Err(consumer_err)) => { + let combined_error = producer_err.context("Error in producer"); + Err(combined_error.context(format!("Error in consumer: {:?}", consumer_err))) + } + } + } + + async fn produce_tx(&self, tx: Sender) -> anyhow::Result<()> { + tracing::info!("Start to produce transactions"); + let mut block_number = 0; + let mut produced_tx_order = 0; + let mut executed = true; + loop { + let tx_list = self.load_ledger_tx_list(block_number)?; + if tx_list.is_none() { + block_number -= 1; // no chunk belongs to this block_number + break; + } + let tx_list = tx_list.unwrap(); + for mut ledger_tx in tx_list { + let tx_order = ledger_tx.sequence_info.tx_order; + if tx_order > self.tx_order_end { + break; + } + if executed { + let execution_info = self + .transaction_store + .get_tx_execution_info(ledger_tx.data.tx_hash())?; + if execution_info.is_some() { + continue; + } + tracing::info!("tx_order: {} is not executed, begin at here", tx_order); + executed = false; + } + + let l1_block_with_body = match &ledger_tx.data { + LedgerTxData::L1Block(block) => { + let block_hash_vec = block.block_hash.clone(); + let block_hash = bitcoin::block::BlockHash::from_slice(&block_hash_vec)?; + let btc_block = self.bitcoin_client_proxy.get_block(block_hash).await?; + let block_body = BitcoinBlock::from(btc_block); + Some(L1BlockWithBody::new(block.clone(), block_body.encode())) + } + _ => None, + }; + + tx.send(ExecMsg { + tx_order, + ledger_tx, + l1_block_with_body, + }) + .await?; + produced_tx_order = tx_order; + self.produced + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + block_number += 1; + } + tracing::info!( + "All transactions are produced, max_block_number: {}, max_tx_order: {}", + block_number, + produced_tx_order + ); + Ok(()) + } + + async fn consume_tx(&self, mut rx: Receiver) -> anyhow::Result<()> { + tracing::info!("Start to consume transactions"); + let mut verified_tx_order = 0; + let mut last_record_time = std::time::Instant::now(); + loop { + let exec_msg_opt = rx.recv().await; + if exec_msg_opt.is_none() { + break; + } + let exec_msg = exec_msg_opt.unwrap(); + let tx_order = exec_msg.tx_order; + + self.execute(exec_msg).await?; + + verified_tx_order = tx_order; + self.verified_tx_order + .store(verified_tx_order, std::sync::atomic::Ordering::Relaxed); + let done = self.done.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; + + if done % 10000 == 0 { + let elapsed = last_record_time.elapsed(); + tracing::info!( + "execute tx range: [{}, {}], cost: {:?}, avg: {:.3} ms/tx", + tx_order + 1 - 10000, // add first, avoid overflow + tx_order, + elapsed, + elapsed.as_millis() as f64 / 10000f64 + ); + last_record_time = std::time::Instant::now(); + } + } + tracing::info!( + "All transactions execution state root are strictly equal to RoochNetwork: [0, {}]", + verified_tx_order + ); + Ok(()) + } + + fn load_ledger_tx_list( + &self, + block_number: u128, + ) -> anyhow::Result>> { + let segments = self.chunks.get(&block_number); + if segments.is_none() { + return Ok(None); + } + let tx_list = get_tx_list_from_chunk( + self.segment_dir.clone(), + block_number, + segments.unwrap().clone(), + )?; + Ok(Some(tx_list)) + } + + async fn execute(&self, msg: ExecMsg) -> anyhow::Result<()> { + let ExecMsg { + tx_order, + ledger_tx, + l1_block_with_body, + } = msg; + let moveos_tx = self + .validate_ledger_transaction(ledger_tx, l1_block_with_body) + .await?; + self.execute_moveos_tx(tx_order, moveos_tx).await + } + + async fn validate_ledger_transaction( + &self, + ledger_tx: LedgerTransaction, + l1block_with_body: Option, + ) -> anyhow::Result { + let moveos_tx = match &ledger_tx.data { + LedgerTxData::L1Block(_block) => { + self.executor + .validate_l1_block(l1block_with_body.unwrap()) + .await? + } + LedgerTxData::L1Tx(l1_tx) => self.executor.validate_l1_tx(l1_tx.clone()).await?, + LedgerTxData::L2Tx(l2_tx) => self.executor.validate_l2_tx(l2_tx.clone()).await?, + }; + Ok(moveos_tx) + } + + async fn execute_moveos_tx( + &self, + tx_order: u64, + moveos_tx: VerifiedMoveOSTransaction, + ) -> anyhow::Result<()> { + let executor = self.executor.clone(); + + let (_output, execution_info) = executor.execute_transaction(moveos_tx.clone()).await?; + + let root = execution_info.root_metadata(); + let expected_root_opt = self.order_state_pair.get(&tx_order); + match expected_root_opt { + Some(expected_root) => { + if root.state_root.unwrap() != *expected_root { + return Err(anyhow::anyhow!( + "Execution state root is not equal to RoochNetwork: tx_order: {}, exp: {:?}, act: {:?}", + tx_order, + *expected_root, root.state_root.unwrap() + )); + } + Ok(()) + } + None => Ok(()), + } + } +} diff --git a/crates/rooch/src/commands/da/commands/mod.rs b/crates/rooch/src/commands/da/commands/mod.rs index 9688e97e39..156c38fd6b 100644 --- a/crates/rooch/src/commands/da/commands/mod.rs +++ b/crates/rooch/src/commands/da/commands/mod.rs @@ -1,5 +1,58 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 +use rooch_types::da::chunk::chunk_from_segments; +use rooch_types::da::segment::{segment_from_bytes, SegmentID}; +use rooch_types::transaction::LedgerTransaction; +use std::collections::HashMap; +use std::fs; +use std::path::PathBuf; + +pub mod exec; pub mod namespace; pub mod unpack; + +// collect all the chunks from segment_dir. +// each segment is stored in a file named by the segment_id. +// each chunk may contain multiple segments. +// we collect all the chunks and their segment numbers to unpack them later. +pub(crate) fn collect_chunks(segment_dir: PathBuf) -> anyhow::Result>> { + let mut chunks = HashMap::new(); + for entry in fs::read_dir(segment_dir)?.flatten() { + let path = entry.path(); + if path.is_file() { + if let Some(segment_id) = path + .file_name() + .and_then(|s| s.to_str()?.parse::().ok()) + { + let chunk_id = segment_id.chunk_id; + let segment_number = segment_id.segment_number; + let segments: &mut Vec = chunks.entry(chunk_id).or_default(); + segments.push(segment_number); + } + } + } + Ok(chunks) +} + +pub(crate) fn get_tx_list_from_chunk( + segment_dir: PathBuf, + chunk_id: u128, + segment_numbers: Vec, +) -> anyhow::Result> { + let mut segments = Vec::new(); + for segment_number in segment_numbers { + let segment_id = SegmentID { + chunk_id, + segment_number, + }; + let segment_path = segment_dir.join(segment_id.to_string()); + let segment_bytes = fs::read(segment_path)?; + let segment = segment_from_bytes(&segment_bytes)?; + segments.push(segment); + } + let chunk = chunk_from_segments(segments)?; + let batch = chunk.get_batches().into_iter().next().unwrap(); + batch.verify(true)?; + Ok(batch.get_tx_list()) +} diff --git a/crates/rooch/src/commands/da/commands/namespace.rs b/crates/rooch/src/commands/da/commands/namespace.rs index d22edd3115..cccf794dc6 100644 --- a/crates/rooch/src/commands/da/commands/namespace.rs +++ b/crates/rooch/src/commands/da/commands/namespace.rs @@ -2,6 +2,8 @@ // SPDX-License-Identifier: Apache-2.0 use clap::Parser; +use hex::encode; +use moveos_types::h256::sha2_256_of; use rooch_config::da_config::derive_genesis_namespace; use rooch_types::error::RoochResult; use std::path::PathBuf; @@ -16,8 +18,10 @@ pub struct NamespaceCommand { impl NamespaceCommand { pub fn execute(self) -> RoochResult<()> { let genesis_bytes = std::fs::read(&self.genesis_file_path)?; + let full_hash = encode(sha2_256_of(&genesis_bytes).0); let namespace = derive_genesis_namespace(&genesis_bytes); println!("DA genesis namespace: {}", namespace); + println!("DA genesis full hash: {}", full_hash); Ok(()) } } diff --git a/crates/rooch/src/commands/da/commands/unpack.rs b/crates/rooch/src/commands/da/commands/unpack.rs index 9a055d3106..a6278a8c6d 100644 --- a/crates/rooch/src/commands/da/commands/unpack.rs +++ b/crates/rooch/src/commands/da/commands/unpack.rs @@ -1,9 +1,8 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 +use crate::commands::da::commands::{collect_chunks, get_tx_list_from_chunk}; use clap::Parser; -use rooch_types::da::chunk::chunk_from_segments; -use rooch_types::da::segment::{segment_from_bytes, SegmentID}; use rooch_types::error::RoochResult; use std::collections::{HashMap, HashSet}; use std::fs; @@ -118,25 +117,9 @@ impl UnpackInner { Ok(()) } - // collect all the chunks from segment_dir. - // each segment is stored in a file named by the segment_id. - // each chunk may contain multiple segments. - // we collect all the chunks and their segment numbers to unpack them later. fn collect_chunks(&mut self) -> anyhow::Result<()> { - for entry in fs::read_dir(&self.segment_dir)?.flatten() { - let path = entry.path(); - if path.is_file() { - if let Some(segment_id) = path - .file_name() - .and_then(|s| s.to_str()?.parse::().ok()) - { - let chunk_id = segment_id.chunk_id; - let segment_number = segment_id.segment_number; - let segments = self.chunks.entry(chunk_id).or_default(); - segments.push(segment_number); - } - } - } + let chunks = collect_chunks(self.segment_dir.clone())?; + self.chunks = chunks; Ok(()) } @@ -154,20 +137,11 @@ impl UnpackInner { continue; } - let mut segments = Vec::new(); - for segment_number in segment_numbers { - let segment_id = SegmentID { - chunk_id: *chunk_id, - segment_number: *segment_number, - }; - let segment_path = self.segment_dir.join(segment_id.to_string()); - let segment_bytes = fs::read(segment_path)?; - let segment = segment_from_bytes(&segment_bytes)?; - segments.push(segment); - } - let chunk = chunk_from_segments(segments)?; - let batch = chunk.get_batches().into_iter().next().unwrap(); - batch.verify(true)?; + let tx_list = get_tx_list_from_chunk( + self.segment_dir.clone(), + *chunk_id, + segment_numbers.clone(), + )?; // write LedgerTx in batch to file, each line is a tx in json let batch_file_path = self.batch_dir.join(chunk_id.to_string()); @@ -178,7 +152,6 @@ impl UnpackInner { .open(batch_file_path)?; let mut writer = BufWriter::with_capacity(8 * 1024 * 1024, file.try_clone().unwrap()); - let tx_list = batch.get_tx_list(); for tx in tx_list { let tx_json = serde_json::to_string(&tx)?; writeln!(writer, "{}", tx_json).expect("Unable to write line"); diff --git a/crates/rooch/src/commands/da/mod.rs b/crates/rooch/src/commands/da/mod.rs index 996d04918c..9faa8e0c8c 100644 --- a/crates/rooch/src/commands/da/mod.rs +++ b/crates/rooch/src/commands/da/mod.rs @@ -4,6 +4,7 @@ pub mod commands; use crate::cli_types::CommandAction; +use crate::commands::da::commands::exec::ExecCommand; use crate::commands::da::commands::namespace::NamespaceCommand; use crate::commands::da::commands::unpack::UnpackCommand; use async_trait::async_trait; @@ -21,10 +22,9 @@ pub struct DA { impl CommandAction for DA { async fn execute(self) -> RoochResult { match self.cmd { - DACommand::Unpack(unpack) => unpack.execute().map(|resp| { - serde_json::to_string_pretty(&resp).expect("Failed to serialize response") - }), + DACommand::Unpack(unpack) => unpack.execute().map(|_| "".to_owned()), DACommand::Namespace(namespace) => namespace.execute().map(|_| "".to_owned()), + DACommand::Exec(exec) => exec.execute().await.map(|_| "".to_owned()), } } } @@ -34,4 +34,5 @@ impl CommandAction for DA { pub enum DACommand { Unpack(UnpackCommand), Namespace(NamespaceCommand), + Exec(ExecCommand), } diff --git a/crates/rooch/src/commands/statedb/commands/re_genesis.rs b/crates/rooch/src/commands/statedb/commands/re_genesis.rs index bd6950d790..aec128d9f0 100644 --- a/crates/rooch/src/commands/statedb/commands/re_genesis.rs +++ b/crates/rooch/src/commands/statedb/commands/re_genesis.rs @@ -35,7 +35,7 @@ impl Display for ReGenesisMode { impl FromStr for ReGenesisMode { type Err = &'static str; - fn from_str(s: &str) -> std::result::Result { + fn from_str(s: &str) -> Result { match s.to_lowercase().as_str() { "export" => Ok(ReGenesisMode::Export), "remove" => Ok(ReGenesisMode::Remove), @@ -58,7 +58,7 @@ pub struct ReGenesisCommand { pub chain_id: Option, #[clap(long)] - pub export_path: PathBuf, + pub export_path: Option, #[clap(long)] pub mode: Option, } @@ -77,8 +77,9 @@ impl ReGenesisCommand { fn export(&self) { let rooch_db = init_rooch_db(self.base_data_dir.clone(), self.chain_id.clone()); + let export_path = self.export_path.clone().unwrap(); - let writer = std::fs::File::create(self.export_path.clone()).unwrap(); + let writer = std::fs::File::create(export_path).unwrap(); let mut writer = std::io::BufWriter::new(writer); let mut outputs = Vec::new(); @@ -137,7 +138,8 @@ impl ReGenesisCommand { } fn restore(&self) { - let reader = std::fs::File::open(self.export_path.clone()).unwrap(); + let export_path = self.export_path.clone().unwrap(); + let reader = std::fs::File::open(export_path).unwrap(); let reader = std::io::BufReader::new(reader); let mut lines = reader.lines(); let genesis_info = serde_json::from_str(&lines.next().unwrap().unwrap()).unwrap();