Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
baicaiyihao authored Dec 17, 2024
2 parents c060eea + 233f95b commit 31d8f81
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 21 deletions.
8 changes: 8 additions & 0 deletions crates/rooch-config/src/store_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ pub struct StoreConfig {
)]
pub max_write_buffer_number: Option<u64>,

#[clap(
name = "rocksdb-enable-statistics",
long,
help = "rocksdb enable statistics"
)]
pub enable_statistics: bool,

#[serde(skip)]
#[clap(skip)]
base: Option<Arc<BaseConfig>>,
Expand Down Expand Up @@ -147,6 +154,7 @@ impl StoreConfig {
.unwrap_or(default.max_write_buffer_numer),
block_cache_size: self.block_cache_size.unwrap_or(block_cache_size),
block_size: self.block_size.unwrap_or(default.block_size),
enable_statistics: self.enable_statistics,
}
}

Expand Down
79 changes: 63 additions & 16 deletions crates/rooch/src/commands/da/commands/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,18 +159,19 @@ impl ExecCommand {
.await?;

let (order_state_pair, tx_order_end) = self.load_order_state_pair();
let chunks = collect_chunks(self.segment_dir.clone())?;
let (chunks, max_chunk_id) = collect_chunks(self.segment_dir.clone())?;
Ok(ExecInner {
segment_dir: self.segment_dir.clone(),
chunks,
max_chunk_id,
order_state_pair,
tx_order_end,
bitcoin_client_proxy,
executor,
transaction_store: moveos_store.transaction_store,
produced: Arc::new(AtomicU64::new(0)),
done: Arc::new(AtomicU64::new(0)),
verified_tx_order: Arc::new(AtomicU64::new(0)),
executed_tx_order: Arc::new(AtomicU64::new(0)),
})
}

Expand All @@ -197,6 +198,7 @@ impl ExecCommand {
struct ExecInner {
segment_dir: PathBuf,
chunks: HashMap<u128, Vec<u64>>,
max_chunk_id: u128,
order_state_pair: HashMap<u64, H256>,
tx_order_end: u64,

Expand All @@ -208,7 +210,7 @@ struct ExecInner {
// stats
produced: Arc<AtomicU64>,
done: Arc<AtomicU64>,
verified_tx_order: Arc<AtomicU64>,
executed_tx_order: Arc<AtomicU64>,
}

struct ExecMsg {
Expand All @@ -220,25 +222,28 @@ struct ExecMsg {
impl ExecInner {
async fn run(&self) -> anyhow::Result<()> {
let done_clone = self.done.clone();
let verified_tx_order_clone = self.verified_tx_order.clone();
let executed_tx_order_clone = self.executed_tx_order.clone();
let produced_clone = self.produced.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
let done = done_clone.load(std::sync::atomic::Ordering::Relaxed);
let verified_tx_order =
verified_tx_order_clone.load(std::sync::atomic::Ordering::Relaxed);
let executed_tx_order =
executed_tx_order_clone.load(std::sync::atomic::Ordering::Relaxed);
let produced = produced_clone.load(std::sync::atomic::Ordering::Relaxed);
tracing::info!(
"produced: {}, done: {}, verified_tx_order: {}",
"produced: {}, done: {}, max executed_tx_order: {}",
produced,
done,
verified_tx_order
executed_tx_order
);
}
});

let (tx, rx) = tokio::sync::mpsc::channel(16);
// larger buffer size to avoid rx starving caused by consumer has to access disks and request btc block.
// after consumer load data(ledger_tx) from disk/btc client, burst to executor, need large buffer to avoid blocking.
// 16384 is a magic number, it's a trade-off between memory usage and performance. (usually tx count inside a block is under 8192, MAX_TXS_PER_BLOCK_IN_FIX)
let (tx, rx) = tokio::sync::mpsc::channel(16384);
let producer = self.produce_tx(tx);
let consumer = self.consume_tx(rx);

Expand All @@ -254,9 +259,47 @@ impl ExecInner {
}
}

fn find_begin_chunk(&self) -> anyhow::Result<u128> {
// binary-search from chunk [0, max_chunk_id], find max chunk_id that is finished.
let mut left = 0;
let mut right = self.max_chunk_id;
while left < right {
let mid = left + (right - left) / 2;
if self.is_chunk_finished(mid)? {
left = mid + 1;
} else {
right = mid;
}
}
Ok(left)
}

fn is_chunk_finished(&self, chunk_id: u128) -> anyhow::Result<bool> {
let segments = self.chunks.get(&chunk_id);
if segments.is_none() {
return Err(anyhow::anyhow!("chunk: {} not found", chunk_id));
}
let mut tx_list = get_tx_list_from_chunk(
self.segment_dir.clone(),
chunk_id,
segments.unwrap().clone(),
)?;
let last_tx_in_chunk = tx_list
.last_mut()
.unwrap_or_else(|| panic!("chunk: {} tx_list is empty", chunk_id));
let last_tx_hash = last_tx_in_chunk.tx_hash();
self.is_tx_executed(last_tx_hash)
}

fn is_tx_executed(&self, tx_hash: H256) -> anyhow::Result<bool> {
let execution_info = self.transaction_store.get_tx_execution_info(tx_hash)?;
Ok(execution_info.is_some())
}

async fn produce_tx(&self, tx: Sender<ExecMsg>) -> anyhow::Result<()> {
tracing::info!("Start to produce transactions");
let mut block_number = 0;
let mut block_number = self.find_begin_chunk()?;

tracing::info!("Start to produce transactions from block: {}", block_number);
let mut produced_tx_order = 0;
let mut executed = true;
loop {
Expand Down Expand Up @@ -315,7 +358,7 @@ impl ExecInner {

async fn consume_tx(&self, mut rx: Receiver<ExecMsg>) -> anyhow::Result<()> {
tracing::info!("Start to consume transactions");
let mut verified_tx_order = 0;
let mut executed_tx_order = 0;
let mut last_record_time = std::time::Instant::now();
loop {
let exec_msg_opt = rx.recv().await;
Expand All @@ -327,9 +370,9 @@ impl ExecInner {

self.execute(exec_msg).await?;

verified_tx_order = tx_order;
self.verified_tx_order
.store(verified_tx_order, std::sync::atomic::Ordering::Relaxed);
executed_tx_order = tx_order;
self.executed_tx_order
.store(executed_tx_order, std::sync::atomic::Ordering::Relaxed);
let done = self.done.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;

if done % 10000 == 0 {
Expand All @@ -346,7 +389,7 @@ impl ExecInner {
}
tracing::info!(
"All transactions execution state root are strictly equal to RoochNetwork: [0, {}]",
verified_tx_order
executed_tx_order
);
Ok(())
}
Expand Down Expand Up @@ -416,6 +459,10 @@ impl ExecInner {
*expected_root, root.state_root.unwrap()
));
}
tracing::info!(
"Execution state root is equal to RoochNetwork: tx_order: {}",
tx_order
);
Ok(())
}
None => Ok(()),
Expand Down
10 changes: 8 additions & 2 deletions crates/rooch/src/commands/da/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ pub mod unpack;
// each segment is stored in a file named by the segment_id.
// each chunk may contain multiple segments.
// we collect all the chunks and their segment numbers to unpack them later.
pub(crate) fn collect_chunks(segment_dir: PathBuf) -> anyhow::Result<HashMap<u128, Vec<u64>>> {
pub(crate) fn collect_chunks(
segment_dir: PathBuf,
) -> anyhow::Result<(HashMap<u128, Vec<u64>>, u128)> {
let mut chunks = HashMap::new();
let mut max_chunk_id = 0;
for entry in fs::read_dir(segment_dir)?.flatten() {
let path = entry.path();
if path.is_file() {
Expand All @@ -29,10 +32,13 @@ pub(crate) fn collect_chunks(segment_dir: PathBuf) -> anyhow::Result<HashMap<u12
let segment_number = segment_id.segment_number;
let segments: &mut Vec<u64> = chunks.entry(chunk_id).or_default();
segments.push(segment_number);
if chunk_id > max_chunk_id {
max_chunk_id = chunk_id;
}
}
}
}
Ok(chunks)
Ok((chunks, max_chunk_id))
}

pub(crate) fn get_tx_list_from_chunk(
Expand Down
2 changes: 1 addition & 1 deletion crates/rooch/src/commands/da/commands/unpack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl UnpackInner {
}

fn collect_chunks(&mut self) -> anyhow::Result<()> {
let chunks = collect_chunks(self.segment_dir.clone())?;
let (chunks, _max_chunk_id) = collect_chunks(self.segment_dir.clone())?;
self.chunks = chunks;
Ok(())
}
Expand Down
38 changes: 38 additions & 0 deletions crates/rooch/src/commands/db/commands/get_changeset_by_order.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) RoochNetwork
// SPDX-License-Identifier: Apache-2.0

use crate::commands::db::commands::init;
use clap::Parser;
use rooch_config::R_OPT_NET_HELP;
use rooch_store::state_store::StateStore;
use rooch_types::error::RoochResult;
use rooch_types::rooch_network::RoochChainID;
use std::path::PathBuf;

/// Get changeset by order
#[derive(Debug, Parser)]
pub struct GetChangesetByOrderCommand {
#[clap(long)]
pub order: u64,

#[clap(long = "data-dir", short = 'd')]
/// Path to data dir, this dir is base dir, the final data_dir is base_dir/chain_network_name
pub base_data_dir: Option<PathBuf>,

/// If local chainid, start the service with a temporary data store.
/// All data will be deleted when the service is stopped.
#[clap(long, short = 'n', help = R_OPT_NET_HELP)]
pub chain_id: Option<RoochChainID>,
}

impl GetChangesetByOrderCommand {
pub async fn execute(self) -> RoochResult<()> {
let (_root, rooch_db, _start_time) = init(self.base_data_dir, self.chain_id);
let rooch_store = rooch_db.rooch_store;
let tx_order = self.order;
let state_change_set_ext_opt = rooch_store.get_state_change_set(tx_order)?;
println!("{:?}", state_change_set_ext_opt);

Ok(())
}
}
1 change: 1 addition & 0 deletions crates/rooch/src/commands/db/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::path::PathBuf;
use std::time::SystemTime;

pub mod drop;
pub mod get_changeset_by_order;
pub mod repair;
pub mod revert;
pub mod rollback;
Expand Down
7 changes: 7 additions & 0 deletions crates/rooch/src/commands/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::cli_types::CommandAction;
use crate::commands::db::commands::drop::DropCommand;
use crate::commands::db::commands::get_changeset_by_order::GetChangesetByOrderCommand;
use crate::commands::db::commands::repair::RepairCommand;
use crate::commands::db::commands::revert::RevertCommand;
use async_trait::async_trait;
Expand Down Expand Up @@ -35,6 +36,11 @@ impl CommandAction<String> for DB {
DBCommand::Repair(repair) => repair.execute().await.map(|resp| {
serde_json::to_string_pretty(&resp).expect("Failed to serialize response")
}),
DBCommand::GetChangesetByOrder(get_changeset_by_order) => {
get_changeset_by_order.execute().await.map(|resp| {
serde_json::to_string_pretty(&resp).expect("Failed to serialize response")
})
}
}
}
}
Expand All @@ -46,4 +52,5 @@ pub enum DBCommand {
Rollback(RollbackCommand),
Drop(DropCommand),
Repair(RepairCommand),
GetChangesetByOrder(GetChangesetByOrderCommand),
}
7 changes: 7 additions & 0 deletions moveos/moveos-config/src/store_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ pub struct RocksdbConfig {
pub block_cache_size: u64,
#[clap(name = "rocksdb-block-size", long, help = "rocksdb block size")]
pub block_size: u64,
#[clap(
name = "rocksdb-enable-statistics",
long,
help = "rocksdb enable statistics"
)]
pub enable_statistics: bool,
}

impl RocksdbConfig {
Expand All @@ -77,6 +83,7 @@ impl Default for RocksdbConfig {
max_write_buffer_numer: 4,
block_cache_size: 1u64 << 32,
block_size: 4 * 1024,
enable_statistics: false,
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions moveos/raw-store/src/rocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,10 @@ impl RocksDB {
db_opts.set_row_cache(&cache);
db_opts.set_enable_pipelined_write(true);
db_opts.set_wal_recovery_mode(DBRecoveryMode::PointInTime); // for memtable crash recovery
db_opts.enable_statistics();
db_opts.set_statistics_level(statistics::StatsLevel::ExceptTimeForMutex);
if config.enable_statistics {
db_opts.enable_statistics();
db_opts.set_statistics_level(statistics::StatsLevel::ExceptTimeForMutex);
}
db_opts
}
fn iter_with_direction<K, V>(
Expand Down

0 comments on commit 31d8f81

Please sign in to comment.