Skip to content

Commit

Permalink
feat(rooch-da): ensure batch continuity and fix tx order error (#3197)
Browse files Browse the repository at this point in the history
Refactored BatchMaker to validate transaction order continuity, preventing mismatches between tx_order_start and the last block's tx_order_end. Introduced better error handling during batch creation to avoid database update inconsistencies. Updated tests to confirm correct behavior under the new logic.
  • Loading branch information
popcnt1 authored Jan 16, 2025
1 parent 2f106fa commit 9a354e9
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 61 deletions.
9 changes: 3 additions & 6 deletions crates/rooch-da/src/actor/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl DAServerActor {
last_block_number,
last_block_update_time: 0,
background_last_block_update_time: background_last_block_update_time.clone(),
batch_maker: BatchMaker::new(),
batch_maker: BatchMaker::new(rooch_store.clone()),
};

if submit_threshold != 0 {
Expand Down Expand Up @@ -137,11 +137,8 @@ impl DAServerActor {
) -> anyhow::Result<()> {
let tx_order = msg.tx_order;
let tx_timestamp = msg.tx_timestamp;
let batch = self.batch_maker.append_transaction(tx_order, tx_timestamp);
if let Some((tx_order_start, tx_order_end)) = batch {
let block_number = self
.rooch_store
.append_submitting_block(tx_order_start, tx_order_end)?;
let block_number_opt = self.batch_maker.append_transaction(tx_order, tx_timestamp);
if let Some(block_number) = block_number_opt {
self.last_block_number = Some(block_number);
self.last_block_update_time = SystemTime::now()
.duration_since(time::UNIX_EPOCH)?
Expand Down
170 changes: 115 additions & 55 deletions crates/rooch-da/src/batcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,112 @@
// SPDX-License-Identifier: Apache-2.0

use rooch_config::settings::ROOCH_BATCH_INTERVAL;
use rooch_store::da_store::DAMetaStore;
use rooch_store::RoochStore;

pub struct BatchMaker {
pub tx_order_start: u64,
pub tx_order_end: u64,
pub start_timestamp: u64,
struct InProgressBatch {
tx_order_start: u64,
tx_order_end: u64,
start_timestamp: u64,
}

impl BatchMaker {
pub fn new() -> Self {
impl InProgressBatch {
fn init() -> Self {
Self {
tx_order_start: 0,
tx_order_end: 0,
start_timestamp: 0,
}
}

fn reset(&mut self) {
*self = Self::init();
}

// create a new batch with the first transaction
fn begin_with(&mut self, tx_order: u64, mut tx_timestamp: u64) {
if tx_timestamp == 0 {
tx_timestamp = 1;
tracing::warn!("tx_timestamp is 0, should not happen, set to 1");
}

self.tx_order_start = tx_order;
self.tx_order_end = tx_order;
self.start_timestamp = tx_timestamp;
}

// Append transaction to batch:
// 1. If the batch is empty(batch_start_time is 0), set the start time and order
// 1. If the batch is empty(batch_start_time is 0), reset for making a new batch
// 2. If the batch is not empty, check if the transaction is in the interval:
// 2.1 If the transaction is in the interval, update the end order
// 2.2 If the transaction is not in the interval, return the batch and reset the batch range
pub fn append_transaction(&mut self, tx_order: u64, tx_timestamp: u64) -> Option<(u64, u64)> {
// 1. If the transaction is in the interval, update tx_order_end
// 2. If the transaction is not in the interval, return tx range and wait for reset
fn append_transaction(&mut self, tx_order: u64, tx_timestamp: u64) -> Option<(u64, u64)> {
if self.start_timestamp == 0 {
self.tx_order_start = tx_order;
self.tx_order_end = tx_order;
self.start_timestamp = tx_timestamp;
self.begin_with(tx_order, tx_timestamp);
return None;
}

if tx_timestamp < self.start_timestamp || // avoid overflow caused by tx_timestamp - self.last_updated (clock goes back)
let last_tx_order_end = self.tx_order_end;
if tx_order != last_tx_order_end + 1 {
tracing::error!(
"failed to make new batch: transaction order is not continuous, last: {}, current: {}",
last_tx_order_end,
tx_order
);
return None;
}

self.tx_order_end = tx_order;

if tx_timestamp < self.start_timestamp || // backwards checking first, avoid overflow
tx_timestamp - self.start_timestamp < ROOCH_BATCH_INTERVAL
{
self.tx_order_end = tx_order;
return None;
}

let batch = (self.tx_order_start, self.tx_order_end);
self.tx_order_start = tx_order;
self.tx_order_end = tx_order;
self.start_timestamp = tx_timestamp;
Some(batch)
Some((self.tx_order_start, self.tx_order_end))
}
}

pub struct BatchMaker {
in_progress_batch: InProgressBatch,
rooch_store: RoochStore,
}

impl BatchMaker {
pub fn new(rooch_store: RoochStore) -> Self {
Self {
in_progress_batch: InProgressBatch::init(),
rooch_store,
}
}

// append transaction to the batch, return block number if a new batch is made
pub fn append_transaction(&mut self, tx_order: u64, tx_timestamp: u64) -> Option<u128> {
let order_range = self
.in_progress_batch
.append_transaction(tx_order, tx_timestamp);
if let Some((tx_order_start, tx_order_end)) = order_range {
match self
.rooch_store
.append_submitting_block(tx_order_start, tx_order_end)
{
Ok(block_number) => {
// Successfully appended, return the block number & reset the batch
self.in_progress_batch.reset();
return Some(block_number);
}
Err(e) => {
tracing::warn!(
"Failed to append submitting block for range ({}, {}): {}",
tx_order_start,
tx_order_end,
e
);
}
}
};
None
}
}

Expand All @@ -51,48 +116,43 @@ mod test {
use super::*;

#[test]
fn test_append_transaction() {
let mut batch_maker = BatchMaker::new();
let tx_order = 1;
let tx_timestamp = 1;
assert_eq!(batch_maker.append_transaction(tx_order, tx_timestamp), None);

let tx_order = 2;
let tx_timestamp = 2;
assert_eq!(batch_maker.append_transaction(tx_order, tx_timestamp), None);

let tx_order = 3;
let tx_timestamp = 3;
assert_eq!(batch_maker.append_transaction(tx_order, tx_timestamp), None);

let tx_order = 4;
let tx_timestamp = 4;
assert_eq!(batch_maker.append_transaction(tx_order, tx_timestamp), None);

let tx_order = 5;
let tx_timestamp = 1 + ROOCH_BATCH_INTERVAL;
fn test_in_progress_batch() {
let mut in_progress_batch = InProgressBatch::init();
assert_eq!(in_progress_batch.append_transaction(1, 1), None);

assert_eq!(in_progress_batch.append_transaction(2, 2), None);

assert_eq!(in_progress_batch.append_transaction(3, 3), None);

assert_eq!(in_progress_batch.append_transaction(4, 4), None);

assert_eq!(
in_progress_batch.append_transaction(5, 1 + ROOCH_BATCH_INTERVAL),
Some((1, 5))
);

assert_eq!(in_progress_batch.append_transaction(6, 6), None);

assert_eq!(in_progress_batch.append_transaction(7, 7), None);

assert_eq!(in_progress_batch.append_transaction(8, 8), None);

assert_eq!(
batch_maker.append_transaction(tx_order, tx_timestamp),
Some((1, 4))
in_progress_batch.append_transaction(9, 6 + ROOCH_BATCH_INTERVAL),
Some((1, 9))
);

let tx_order = 6;
let tx_timestamp = 6; // clock goes back
assert_eq!(batch_maker.append_transaction(tx_order, tx_timestamp), None);
in_progress_batch.reset();

assert_eq!(in_progress_batch.append_transaction(6, 6), None);

let tx_order = 7;
let tx_timestamp = 7;
assert_eq!(batch_maker.append_transaction(tx_order, tx_timestamp), None);
assert_eq!(in_progress_batch.append_transaction(7, 7), None);

let tx_order = 8;
let tx_timestamp = 8;
assert_eq!(batch_maker.append_transaction(tx_order, tx_timestamp), None);
assert_eq!(in_progress_batch.append_transaction(8, 8), None);

let tx_order = 9;
let tx_timestamp = 1 + ROOCH_BATCH_INTERVAL + ROOCH_BATCH_INTERVAL;
assert_eq!(
batch_maker.append_transaction(tx_order, tx_timestamp),
Some((5, 8))
in_progress_batch.append_transaction(9, 6 + ROOCH_BATCH_INTERVAL),
Some((6, 9))
);
}
}

0 comments on commit 9a354e9

Please sign in to comment.