From 9a354e9a97d5c710ba8df9d58792db19f0498898 Mon Sep 17 00:00:00 2001 From: popcnt <142196625+popcnt1@users.noreply.github.com> Date: Thu, 16 Jan 2025 11:12:34 +0800 Subject: [PATCH] feat(rooch-da): ensure batch continuity and fix tx order error (#3197) Refactored BatchMaker to validate transaction order continuity, preventing mismatches between tx_order_start and the last block's tx_order_end. Introduced better error handling during batch creation to avoid database update inconsistencies. Updated tests to confirm correct behavior under the new logic. --- crates/rooch-da/src/actor/server.rs | 9 +- crates/rooch-da/src/batcher/mod.rs | 170 +++++++++++++++++++--------- 2 files changed, 118 insertions(+), 61 deletions(-) diff --git a/crates/rooch-da/src/actor/server.rs b/crates/rooch-da/src/actor/server.rs index 7a13e0c05a..83a4efabd7 100644 --- a/crates/rooch-da/src/actor/server.rs +++ b/crates/rooch-da/src/actor/server.rs @@ -66,7 +66,7 @@ impl DAServerActor { last_block_number, last_block_update_time: 0, background_last_block_update_time: background_last_block_update_time.clone(), - batch_maker: BatchMaker::new(), + batch_maker: BatchMaker::new(rooch_store.clone()), }; if submit_threshold != 0 { @@ -137,11 +137,8 @@ impl DAServerActor { ) -> anyhow::Result<()> { let tx_order = msg.tx_order; let tx_timestamp = msg.tx_timestamp; - let batch = self.batch_maker.append_transaction(tx_order, tx_timestamp); - if let Some((tx_order_start, tx_order_end)) = batch { - let block_number = self - .rooch_store - .append_submitting_block(tx_order_start, tx_order_end)?; + let block_number_opt = self.batch_maker.append_transaction(tx_order, tx_timestamp); + if let Some(block_number) = block_number_opt { self.last_block_number = Some(block_number); self.last_block_update_time = SystemTime::now() .duration_since(time::UNIX_EPOCH)? diff --git a/crates/rooch-da/src/batcher/mod.rs b/crates/rooch-da/src/batcher/mod.rs index 98854b431a..8ba3e03e17 100644 --- a/crates/rooch-da/src/batcher/mod.rs +++ b/crates/rooch-da/src/batcher/mod.rs @@ -2,15 +2,17 @@ // SPDX-License-Identifier: Apache-2.0 use rooch_config::settings::ROOCH_BATCH_INTERVAL; +use rooch_store::da_store::DAMetaStore; +use rooch_store::RoochStore; -pub struct BatchMaker { - pub tx_order_start: u64, - pub tx_order_end: u64, - pub start_timestamp: u64, +struct InProgressBatch { + tx_order_start: u64, + tx_order_end: u64, + start_timestamp: u64, } -impl BatchMaker { - pub fn new() -> Self { +impl InProgressBatch { + fn init() -> Self { Self { tx_order_start: 0, tx_order_end: 0, @@ -18,31 +20,94 @@ impl BatchMaker { } } + fn reset(&mut self) { + *self = Self::init(); + } + + // create a new batch with the first transaction + fn begin_with(&mut self, tx_order: u64, mut tx_timestamp: u64) { + if tx_timestamp == 0 { + tx_timestamp = 1; + tracing::warn!("tx_timestamp is 0, should not happen, set to 1"); + } + + self.tx_order_start = tx_order; + self.tx_order_end = tx_order; + self.start_timestamp = tx_timestamp; + } + // Append transaction to batch: - // 1. If the batch is empty(batch_start_time is 0), set the start time and order + // 1. If the batch is empty(batch_start_time is 0), reset for making a new batch // 2. If the batch is not empty, check if the transaction is in the interval: - // 2.1 If the transaction is in the interval, update the end order - // 2.2 If the transaction is not in the interval, return the batch and reset the batch range - pub fn append_transaction(&mut self, tx_order: u64, tx_timestamp: u64) -> Option<(u64, u64)> { + // 1. If the transaction is in the interval, update tx_order_end + // 2. If the transaction is not in the interval, return tx range and wait for reset + fn append_transaction(&mut self, tx_order: u64, tx_timestamp: u64) -> Option<(u64, u64)> { if self.start_timestamp == 0 { - self.tx_order_start = tx_order; - self.tx_order_end = tx_order; - self.start_timestamp = tx_timestamp; + self.begin_with(tx_order, tx_timestamp); return None; } - if tx_timestamp < self.start_timestamp || // avoid overflow caused by tx_timestamp - self.last_updated (clock goes back) + let last_tx_order_end = self.tx_order_end; + if tx_order != last_tx_order_end + 1 { + tracing::error!( + "failed to make new batch: transaction order is not continuous, last: {}, current: {}", + last_tx_order_end, + tx_order + ); + return None; + } + + self.tx_order_end = tx_order; + + if tx_timestamp < self.start_timestamp || // backwards checking first, avoid overflow tx_timestamp - self.start_timestamp < ROOCH_BATCH_INTERVAL { - self.tx_order_end = tx_order; return None; } - let batch = (self.tx_order_start, self.tx_order_end); - self.tx_order_start = tx_order; - self.tx_order_end = tx_order; - self.start_timestamp = tx_timestamp; - Some(batch) + Some((self.tx_order_start, self.tx_order_end)) + } +} + +pub struct BatchMaker { + in_progress_batch: InProgressBatch, + rooch_store: RoochStore, +} + +impl BatchMaker { + pub fn new(rooch_store: RoochStore) -> Self { + Self { + in_progress_batch: InProgressBatch::init(), + rooch_store, + } + } + + // append transaction to the batch, return block number if a new batch is made + pub fn append_transaction(&mut self, tx_order: u64, tx_timestamp: u64) -> Option { + let order_range = self + .in_progress_batch + .append_transaction(tx_order, tx_timestamp); + if let Some((tx_order_start, tx_order_end)) = order_range { + match self + .rooch_store + .append_submitting_block(tx_order_start, tx_order_end) + { + Ok(block_number) => { + // Successfully appended, return the block number & reset the batch + self.in_progress_batch.reset(); + return Some(block_number); + } + Err(e) => { + tracing::warn!( + "Failed to append submitting block for range ({}, {}): {}", + tx_order_start, + tx_order_end, + e + ); + } + } + }; + None } } @@ -51,48 +116,43 @@ mod test { use super::*; #[test] - fn test_append_transaction() { - let mut batch_maker = BatchMaker::new(); - let tx_order = 1; - let tx_timestamp = 1; - assert_eq!(batch_maker.append_transaction(tx_order, tx_timestamp), None); - - let tx_order = 2; - let tx_timestamp = 2; - assert_eq!(batch_maker.append_transaction(tx_order, tx_timestamp), None); - - let tx_order = 3; - let tx_timestamp = 3; - assert_eq!(batch_maker.append_transaction(tx_order, tx_timestamp), None); - - let tx_order = 4; - let tx_timestamp = 4; - assert_eq!(batch_maker.append_transaction(tx_order, tx_timestamp), None); - - let tx_order = 5; - let tx_timestamp = 1 + ROOCH_BATCH_INTERVAL; + fn test_in_progress_batch() { + let mut in_progress_batch = InProgressBatch::init(); + assert_eq!(in_progress_batch.append_transaction(1, 1), None); + + assert_eq!(in_progress_batch.append_transaction(2, 2), None); + + assert_eq!(in_progress_batch.append_transaction(3, 3), None); + + assert_eq!(in_progress_batch.append_transaction(4, 4), None); + + assert_eq!( + in_progress_batch.append_transaction(5, 1 + ROOCH_BATCH_INTERVAL), + Some((1, 5)) + ); + + assert_eq!(in_progress_batch.append_transaction(6, 6), None); + + assert_eq!(in_progress_batch.append_transaction(7, 7), None); + + assert_eq!(in_progress_batch.append_transaction(8, 8), None); + assert_eq!( - batch_maker.append_transaction(tx_order, tx_timestamp), - Some((1, 4)) + in_progress_batch.append_transaction(9, 6 + ROOCH_BATCH_INTERVAL), + Some((1, 9)) ); - let tx_order = 6; - let tx_timestamp = 6; // clock goes back - assert_eq!(batch_maker.append_transaction(tx_order, tx_timestamp), None); + in_progress_batch.reset(); + + assert_eq!(in_progress_batch.append_transaction(6, 6), None); - let tx_order = 7; - let tx_timestamp = 7; - assert_eq!(batch_maker.append_transaction(tx_order, tx_timestamp), None); + assert_eq!(in_progress_batch.append_transaction(7, 7), None); - let tx_order = 8; - let tx_timestamp = 8; - assert_eq!(batch_maker.append_transaction(tx_order, tx_timestamp), None); + assert_eq!(in_progress_batch.append_transaction(8, 8), None); - let tx_order = 9; - let tx_timestamp = 1 + ROOCH_BATCH_INTERVAL + ROOCH_BATCH_INTERVAL; assert_eq!( - batch_maker.append_transaction(tx_order, tx_timestamp), - Some((5, 8)) + in_progress_batch.append_transaction(9, 6 + ROOCH_BATCH_INTERVAL), + Some((6, 9)) ); } }