Skip to content

Commit

Permalink
fix(openda): improve chunk submission tracking and handle optional la…
Browse files Browse the repository at this point in the history
…test_done_chunk_id compatibility (#3232)

Revamped `AdapterSubmitStat` to track submitted segments using a vector instead of the sum, ensuring accurate submission tracking. Adjusted `get_latest_done_chunk_id` to return `Option<u128>` and implemented a fallback for compatibility with older versions. Updated tests accordingly.

Co-authored-by: popcnt1 <[email protected]>
  • Loading branch information
popcnt1 and popcnt1 authored Jan 25, 2025
1 parent 716aab4 commit 3306c1b
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 20 deletions.
1 change: 1 addition & 0 deletions crates/rooch-da/src/actor/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ impl DAServerActor {
// (it's block number too for ChunkV0, which is the only version now)
let future = stat.get_latest_done_chunk_id();
let result = future.await; // Resolve the future
let result = result.unwrap_or_else(|| 0); // for compatibility with old version which doesn't have this Optional field
avail_backends.push((identifier, result));
}

Expand Down
63 changes: 43 additions & 20 deletions crates/rooch-da/src/backend/openda/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct AdapterSubmitStat {

struct AdapterSubmitStatInner {
chunk_id: u128,
segment_number_sum: u64,
submitted_set: Vec<u64>,
latest_done_chunk_id: u128,
}

Expand All @@ -54,35 +54,58 @@ impl AdapterSubmitStat {
Self {
inner: Arc::new(RwLock::new(AdapterSubmitStatInner {
chunk_id: 0,
segment_number_sum: 0,
latest_done_chunk_id: 0,
submitted_set: Vec::new(),
latest_done_chunk_id: u128::MAX,
})),
}
}

// append segment_number to submitted_set
// if segment_number is not in submitted_set, append it
// return the number of segments added
fn append_submitted_to_chunk(submitted_set: &mut Vec<u64>, segment_number: u64) -> usize {
if submitted_set.is_empty() {
submitted_set.push(segment_number);
return 1;
}
let mut has_inserted = false;
for num in submitted_set.iter() {
if *num == segment_number {
has_inserted = true;
break;
}
}
if !has_inserted {
submitted_set.push(segment_number);
}
submitted_set.len()
}

pub async fn add_done_segment(&self, segment_id: SegmentID, is_last_segment: bool) {
let mut inner = self.inner.write().await;
if segment_id.chunk_id != inner.chunk_id {
// new chunk
inner.segment_number_sum = 0;
inner.submitted_set.clear();
inner.chunk_id = segment_id.chunk_id;
}
inner.segment_number_sum += segment_id.segment_number;
let seg_num = segment_id.segment_number;
let submitted = Self::append_submitted_to_chunk(&mut inner.submitted_set, seg_num);
if is_last_segment {
let mut exp_segment_number_sum = 0;
for i in 0..=segment_id.segment_number {
exp_segment_number_sum += i;
}
if exp_segment_number_sum == inner.segment_number_sum {
// only accept segments added in order
inner.latest_done_chunk_id = inner.chunk_id;
let expected = seg_num + 1;
if submitted == expected as usize {
inner.latest_done_chunk_id = segment_id.chunk_id;
}
}
}

pub async fn get_latest_done_chunk_id(&self) -> u128 {
pub async fn get_latest_done_chunk_id(&self) -> Option<u128> {
let inner = self.inner.read().await;
inner.latest_done_chunk_id
let chunk_id = inner.latest_done_chunk_id;
if chunk_id == u128::MAX {
None
} else {
Some(chunk_id)
}
}
}

Expand Down Expand Up @@ -335,41 +358,41 @@ mod tests {
#[tokio::test]
async fn test_adapter_submit_stats() {
let stats = AdapterSubmitStat::new();
assert_eq!(stats.get_latest_done_chunk_id().await, 0);
assert_eq!(stats.get_latest_done_chunk_id().await, None);

let segment_id1 = SegmentID {
chunk_id: 1,
segment_number: 0,
};
stats.add_done_segment(segment_id1, false).await;
assert_eq!(stats.get_latest_done_chunk_id().await, 0);
assert_eq!(stats.get_latest_done_chunk_id().await, None);

let segment_id2 = SegmentID {
chunk_id: 1,
segment_number: 1,
};
stats.add_done_segment(segment_id2, true).await;
assert_eq!(stats.get_latest_done_chunk_id().await, 1);
assert_eq!(stats.get_latest_done_chunk_id().await, Some(1));

let segment_id3 = SegmentID {
chunk_id: 2,
segment_number: 0,
};
stats.add_done_segment(segment_id3, false).await;
assert_eq!(stats.get_latest_done_chunk_id().await, 1);
assert_eq!(stats.get_latest_done_chunk_id().await, Some(1));

let segment_id4 = SegmentID {
chunk_id: 2,
segment_number: 1,
};
stats.add_done_segment(segment_id4, false).await;
assert_eq!(stats.get_latest_done_chunk_id().await, 1);
assert_eq!(stats.get_latest_done_chunk_id().await, Some(1));

let segment_id5 = SegmentID {
chunk_id: 2,
segment_number: 2,
};
stats.add_done_segment(segment_id5, true).await;
assert_eq!(stats.get_latest_done_chunk_id().await, 2);
assert_eq!(stats.get_latest_done_chunk_id().await, Some(2));
}
}

0 comments on commit 3306c1b

Please sign in to comment.