From 2d196425d46ac937f8dbeb3d7b7dae4bfaf8fcf3 Mon Sep 17 00:00:00 2001 From: Maksim Greshnyakov Date: Sun, 26 Jan 2025 16:04:31 +0100 Subject: [PATCH] refactor(collator): refactor iterators range --- block-util/src/queue/proto.rs | 106 ++++++++++++++++++ .../internal_queue/state/commited_state.rs | 14 ++- .../internal_queue/state/shard_iterator.rs | 59 +--------- .../internal_queue/state/state_iterator.rs | 20 +--- .../internal_queue/state/uncommitted_state.rs | 17 ++- collator/tests/internal_queue.rs | 8 +- scripts/gen-dashboard.py | 2 +- storage/src/store/internal_queue/mod.rs | 31 ++++- 8 files changed, 170 insertions(+), 87 deletions(-) diff --git a/block-util/src/queue/proto.rs b/block-util/src/queue/proto.rs index 362efbdbb..ede7a86e6 100644 --- a/block-util/src/queue/proto.rs +++ b/block-util/src/queue/proto.rs @@ -191,6 +191,31 @@ impl QueueKey { } } + // add min step to the key + pub fn next_value(&self) -> Self { + let mut new_lt = self.lt; + let mut new_hash = self.hash; + + if new_hash.0 == [0xff; 32] { + new_lt += 1; + new_hash = HashBytes::ZERO; + } else { + let carry = 1u8; + for byte in new_hash.0.iter_mut().rev() { + let (res, overflow) = byte.overflowing_add(carry); + *byte = res; + if !overflow { + break; + } + } + } + + Self { + lt: new_lt, + hash: new_hash, + } + } + #[inline] pub const fn split(self) -> (u64, HashBytes) { (self.lt, self.hash) @@ -747,4 +772,85 @@ mod tests { let parsed = tl_proto::deserialize::(&bytes).unwrap(); assert_eq!(state, parsed); } + + #[test] + fn test_next_value() { + // 1) Check increment when hash is all zeros + let key_zero = QueueKey { + lt: 5, + hash: HashBytes::ZERO, + }; + let next_zero = key_zero.next_value(); + // Expect that lt remains unchanged and hash is [0..0, 1] + assert_eq!( + next_zero.lt, 5, + "LT must remain the same if hash is not full 0xFF" + ); + let mut expected_hash_zero = [0u8; 32]; + expected_hash_zero[31] = 1; + assert_eq!( + next_zero.hash.0, expected_hash_zero, + "Hash should increment by 1" + ); + + // 2) Check increment when hash has partial 0xFF at the end + // e.g., last two bytes are 0xFF, but not the whole array + let mut partial_ff = [0u8; 32]; + partial_ff[30] = 0xFF; + partial_ff[31] = 0xFF; + let key_partial_ff = QueueKey { + lt: 10, + hash: HashBytes(partial_ff), + }; + let next_partial_ff = key_partial_ff.next_value(); + // Expected result: carry rolls over the last two 0xFF bytes + // and increments the next byte + let mut expected_hash_partial = [0u8; 32]; + expected_hash_partial[29] = 0x01; // incremented by carry + // bytes 30, 31 become 0x00 + assert_eq!( + next_partial_ff.lt, 10, + "LT must remain the same with partial 0xFF" + ); + assert_eq!( + next_partial_ff.hash.0, expected_hash_partial, + "Hash should be incremented correctly with carry" + ); + + // 3) Check increment when hash is fully 0xFF + let key_full_ff = QueueKey { + lt: 999, + hash: HashBytes([0xFF; 32]), + }; + let next_full_ff = key_full_ff.next_value(); + // Expect that hash resets to zero and LT increments by 1 + assert_eq!( + next_full_ff.lt, 1000, + "LT must increment if hash was all 0xFF" + ); + assert_eq!(next_full_ff.hash.0, [0u8; 32], "Hash should reset to zero"); + + // 4) A quick check of mid-range increment with carry: + // Example: [.., 0x01, 0xFF, 0xFF] + let mut mid_hash = [0u8; 32]; + mid_hash[29] = 0x01; + mid_hash[30] = 0xFF; + mid_hash[31] = 0xFF; + let key_mid = QueueKey { + lt: 50, + hash: HashBytes(mid_hash), + }; + let next_mid = key_mid.next_value(); + // We expect that byte 29 increments to 0x02 and the last two bytes become 0x00 + let mut expected_mid_hash = [0u8; 32]; + expected_mid_hash[29] = 0x02; + assert_eq!( + next_mid.lt, 50, + "LT should remain the same for a mid-range carry" + ); + assert_eq!( + next_mid.hash.0, expected_mid_hash, + "Hash should increment the correct byte after partial 0xFF" + ); + } } diff --git a/collator/src/internal_queue/state/commited_state.rs b/collator/src/internal_queue/state/commited_state.rs index 22c2e125c..6aad7ced4 100644 --- a/collator/src/internal_queue/state/commited_state.rs +++ b/collator/src/internal_queue/state/commited_state.rs @@ -1,6 +1,7 @@ use anyhow::Result; use everscale_types::models::{IntAddr, ShardIdent}; use tycho_block_util::queue::QueuePartitionIdx; +use tycho_storage::model::ShardsInternalMessagesKey; use tycho_storage::{InternalQueueSnapshot, Storage}; use tycho_util::metrics::HistogramGuard; use tycho_util::FastHashMap; @@ -106,14 +107,19 @@ impl CommittedState for CommittedStateStdImpl { partition: QueuePartitionIdx, ranges: &[QueueShardRange], ) -> Result>> { - let mut shard_iters_with_ranges = Vec::new(); + let mut shards_iters = Vec::new(); for range in ranges { - let iter = snapshot.iter_messages_commited(); - shard_iters_with_ranges.push((iter, range.clone())); + // exclude from key + let from_key = range.from.next_value(); + let from = ShardsInternalMessagesKey::new(partition, range.shard_ident, from_key); + // include to key + let to_key = range.to.next_value(); + let to = ShardsInternalMessagesKey::new(partition, range.shard_ident, to_key); + shards_iters.push((snapshot.iter_messages_commited(from, to), range.shard_ident)); } - let iterator = StateIteratorImpl::new(partition, shard_iters_with_ranges, receiver)?; + let iterator = StateIteratorImpl::new(shards_iters, receiver)?; Ok(Box::new(iterator)) } diff --git a/collator/src/internal_queue/state/shard_iterator.rs b/collator/src/internal_queue/state/shard_iterator.rs index deacbda60..38fb614a4 100644 --- a/collator/src/internal_queue/state/shard_iterator.rs +++ b/collator/src/internal_queue/state/shard_iterator.rs @@ -1,67 +1,25 @@ use anyhow::Result; use everscale_types::models::ShardIdent; -use tycho_block_util::queue::{QueueKey, QueuePartitionIdx}; -use tycho_storage::model::ShardsInternalMessagesKey; +use tycho_block_util::queue::QueueKey; use tycho_storage::InternalQueueMessagesIter; use crate::types::ShortAddr; -#[derive(Clone, Debug)] -struct Range { - from: ShardsInternalMessagesKey, - to: ShardsInternalMessagesKey, -} - -impl Range { - pub fn contains(&self, key: &ShardsInternalMessagesKey) -> bool { - key > &self.from && key <= &self.to - } -} - -impl From<(QueuePartitionIdx, ShardIdent, QueueKey, QueueKey)> for Range { - fn from(value: (QueuePartitionIdx, ShardIdent, QueueKey, QueueKey)) -> Self { - let (partition, shard_ident, from, to) = value; - - let from = ShardsInternalMessagesKey::new(partition, shard_ident, from); - let to = ShardsInternalMessagesKey::new(partition, shard_ident, to); - - Range { from, to } - } -} - pub enum IterResult<'a> { Value(&'a [u8]), Skip(Option<(ShardIdent, QueueKey)>), } pub struct ShardIterator { - range: Range, receiver: ShardIdent, iterator: InternalQueueMessagesIter, } impl ShardIterator { - pub fn new( - partition: QueuePartitionIdx, - shard_ident: ShardIdent, - from: QueueKey, - to: QueueKey, - receiver: ShardIdent, - mut iterator: InternalQueueMessagesIter, - ) -> Self { - iterator.seek(&ShardsInternalMessagesKey::new( - partition, - shard_ident, - from, - )); - - let range = Range::from((partition, shard_ident, from, to)); + pub fn new(receiver: ShardIdent, mut iterator: InternalQueueMessagesIter) -> Self { + iterator.seek_to_first(); - Self { - range, - receiver, - iterator, - } + Self { receiver, iterator } } #[allow(clippy::should_implement_trait)] @@ -70,15 +28,6 @@ impl ShardIterator { return Ok(None); }; - // skip first key if it is equal to `from` - if msg.key == self.range.from { - return Ok(Some(IterResult::Skip(None))); - } - - if !self.range.contains(&msg.key) { - return Ok(None); - } - let short_addr = ShortAddr::new(msg.workchain as i32, msg.prefix); Ok(Some(if self.receiver.contains_address(&short_addr) { diff --git a/collator/src/internal_queue/state/state_iterator.rs b/collator/src/internal_queue/state/state_iterator.rs index c35d7751f..65b25f2ec 100644 --- a/collator/src/internal_queue/state/state_iterator.rs +++ b/collator/src/internal_queue/state/state_iterator.rs @@ -6,12 +6,12 @@ use std::sync::Arc; use ahash::HashMapExt; use anyhow::{bail, Context, Result}; use everscale_types::models::ShardIdent; -use tycho_block_util::queue::{QueueKey, QueuePartitionIdx}; +use tycho_block_util::queue::QueueKey; use tycho_storage::InternalQueueMessagesIter; use tycho_util::FastHashMap; use crate::internal_queue::state::shard_iterator::{IterResult, ShardIterator}; -use crate::internal_queue::types::{InternalMessageValue, QueueShardRange}; +use crate::internal_queue::types::InternalMessageValue; pub struct ShardIteratorWithRange { pub iter: InternalQueueMessagesIter, @@ -86,21 +86,13 @@ pub struct StateIteratorImpl { impl StateIteratorImpl { pub fn new( - partition: QueuePartitionIdx, - shard_iters_with_ranges: Vec<(InternalQueueMessagesIter, QueueShardRange)>, + shard_iters: Vec<(InternalQueueMessagesIter, ShardIdent)>, receiver: ShardIdent, ) -> Result { - let mut iters = FastHashMap::with_capacity(shard_iters_with_ranges.len()); + let mut iters = FastHashMap::with_capacity(shard_iters.len()); - for (iter, range) in shard_iters_with_ranges { - let QueueShardRange { - shard_ident, - from, - to, - } = range; - - let shard_iterator = - ShardIterator::new(partition, shard_ident, from, to, receiver, iter); + for (iter, shard_ident) in shard_iters { + let shard_iterator = ShardIterator::new(receiver, iter); match iters.entry(shard_ident) { Entry::Occupied(_) => { diff --git a/collator/src/internal_queue/state/uncommitted_state.rs b/collator/src/internal_queue/state/uncommitted_state.rs index 44928a58a..126a0d9ed 100644 --- a/collator/src/internal_queue/state/uncommitted_state.rs +++ b/collator/src/internal_queue/state/uncommitted_state.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use anyhow::Result; use everscale_types::models::{IntAddr, ShardIdent}; use tycho_block_util::queue::{QueueKey, QueuePartitionIdx, RouterAddr}; -use tycho_storage::model::{QueueRange, StatKey}; +use tycho_storage::model::{QueueRange, ShardsInternalMessagesKey, StatKey}; use tycho_storage::{InternalQueueSnapshot, InternalQueueTransaction, Storage}; use tycho_util::metrics::HistogramGuard; use tycho_util::{FastHashMap, FastHashSet}; @@ -120,13 +120,22 @@ impl UncommittedState for UncommittedStateStdImpl { partition: QueuePartitionIdx, ranges: &[QueueShardRange], ) -> Result>> { - let mut shard_iters_with_ranges = Vec::new(); + let mut shards_iters = Vec::new(); for range in ranges { - shard_iters_with_ranges.push((snapshot.iter_messages_uncommited(), range.clone())); + let from_key = range.from.next_value(); + // exclude from key + let from = ShardsInternalMessagesKey::new(partition, range.shard_ident, from_key); + // include to key + let to_key = range.to.next_value(); + let to = ShardsInternalMessagesKey::new(partition, range.shard_ident, to_key); + shards_iters.push(( + snapshot.iter_messages_uncommited(from, to), + range.shard_ident, + )); } - let iterator = StateIteratorImpl::new(partition, shard_iters_with_ranges, receiver)?; + let iterator = StateIteratorImpl::new(shards_iters, receiver)?; Ok(Box::new(iterator)) } diff --git a/collator/tests/internal_queue.rs b/collator/tests/internal_queue.rs index 2200d28e3..8d8b8e273 100644 --- a/collator/tests/internal_queue.rs +++ b/collator/tests/internal_queue.rs @@ -454,11 +454,11 @@ async fn test_queue() -> anyhow::Result<()> { let queue_range = QueueShardRange { shard_ident: ShardIdent::new_full(0), from: QueueKey { - lt: 0, + lt: 10000, hash: HashBytes::default(), }, to: QueueKey { - lt: 16000, + lt: 15500, hash: HashBytes::default(), }, }; @@ -472,7 +472,7 @@ async fn test_queue() -> anyhow::Result<()> { while iterator_manager.next()?.is_some() { read_count += 1; } - assert_eq!(read_count, 15000); + assert_eq!(read_count, 5000); let iterators = queue.iterator( QueuePartitionIdx::default(), @@ -485,7 +485,7 @@ async fn test_queue() -> anyhow::Result<()> { read_count += 1; } - assert_eq!(read_count, 1000); + assert_eq!(read_count, 500); // check two diff iterator let mut ranges = Vec::new(); diff --git a/scripts/gen-dashboard.py b/scripts/gen-dashboard.py index 9323e53bf..aca6e76df 100644 --- a/scripts/gen-dashboard.py +++ b/scripts/gen-dashboard.py @@ -1554,7 +1554,7 @@ def collator_core_operations_metrics() -> RowPanel: ), create_heatmap_panel( "tycho_do_collate_build_statistics_time", - "Build statistics", + "async Apply message queue diff: inc. Build statistics", labels=['workchain=~"$workchain"'], ), create_heatmap_panel( diff --git a/storage/src/store/internal_queue/mod.rs b/storage/src/store/internal_queue/mod.rs index 8026bd485..13cee4188 100644 --- a/storage/src/store/internal_queue/mod.rs +++ b/storage/src/store/internal_queue/mod.rs @@ -414,18 +414,34 @@ pub struct InternalQueueSnapshot { } impl InternalQueueSnapshot { - pub fn iter_messages_commited(&self) -> InternalQueueMessagesIter { - self.iter_messages(&self.db.shard_internal_messages) + pub fn iter_messages_commited( + &self, + from: ShardsInternalMessagesKey, + to: ShardsInternalMessagesKey, + ) -> InternalQueueMessagesIter { + self.iter_messages(&self.db.shard_internal_messages, from, to) } - pub fn iter_messages_uncommited(&self) -> InternalQueueMessagesIter { - self.iter_messages(&self.db.shard_internal_messages_uncommitted) + pub fn iter_messages_uncommited( + &self, + from: ShardsInternalMessagesKey, + to: ShardsInternalMessagesKey, + ) -> InternalQueueMessagesIter { + self.iter_messages(&self.db.shard_internal_messages_uncommitted, from, to) } - fn iter_messages(&self, table: &Table) -> InternalQueueMessagesIter { + fn iter_messages( + &self, + table: &Table, + from: ShardsInternalMessagesKey, + to: ShardsInternalMessagesKey, + ) -> InternalQueueMessagesIter { let mut read_config = table.new_read_config(); read_config.set_snapshot(&self.snapshot); + read_config.set_iterate_lower_bound(from.to_vec().to_vec()); + read_config.set_iterate_upper_bound(to.to_vec().to_vec()); + let db = self.db.rocksdb(); let iter = db.raw_iterator_cf_opt(&table.cf(), read_config); @@ -528,6 +544,11 @@ impl InternalQueueMessagesIter { self.first = true; } + pub fn seek_to_first(&mut self) { + self.inner.seek_to_first(); + self.first = true; + } + #[allow(clippy::should_implement_trait)] pub fn next(&mut self) -> Result>> { if !std::mem::take(&mut self.first) {