Skip to content

Commit

Permalink
refactor(collator): refactor iterators range
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick authored and SmaGMan committed Jan 29, 2025
1 parent 8a8af7a commit 2d19642
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 87 deletions.
106 changes: 106 additions & 0 deletions block-util/src/queue/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,31 @@ impl QueueKey {
}
}

// add min step to the key
pub fn next_value(&self) -> Self {
let mut new_lt = self.lt;
let mut new_hash = self.hash;

if new_hash.0 == [0xff; 32] {
new_lt += 1;
new_hash = HashBytes::ZERO;
} else {
let carry = 1u8;
for byte in new_hash.0.iter_mut().rev() {
let (res, overflow) = byte.overflowing_add(carry);
*byte = res;
if !overflow {
break;
}
}
}

Self {
lt: new_lt,
hash: new_hash,
}
}

#[inline]
pub const fn split(self) -> (u64, HashBytes) {
(self.lt, self.hash)
Expand Down Expand Up @@ -747,4 +772,85 @@ mod tests {
let parsed = tl_proto::deserialize::<QueueStateHeader>(&bytes).unwrap();
assert_eq!(state, parsed);
}

#[test]
fn test_next_value() {
// 1) Check increment when hash is all zeros
let key_zero = QueueKey {
lt: 5,
hash: HashBytes::ZERO,
};
let next_zero = key_zero.next_value();
// Expect that lt remains unchanged and hash is [0..0, 1]
assert_eq!(
next_zero.lt, 5,
"LT must remain the same if hash is not full 0xFF"
);
let mut expected_hash_zero = [0u8; 32];
expected_hash_zero[31] = 1;
assert_eq!(
next_zero.hash.0, expected_hash_zero,
"Hash should increment by 1"
);

// 2) Check increment when hash has partial 0xFF at the end
// e.g., last two bytes are 0xFF, but not the whole array
let mut partial_ff = [0u8; 32];
partial_ff[30] = 0xFF;
partial_ff[31] = 0xFF;
let key_partial_ff = QueueKey {
lt: 10,
hash: HashBytes(partial_ff),
};
let next_partial_ff = key_partial_ff.next_value();
// Expected result: carry rolls over the last two 0xFF bytes
// and increments the next byte
let mut expected_hash_partial = [0u8; 32];
expected_hash_partial[29] = 0x01; // incremented by carry
// bytes 30, 31 become 0x00
assert_eq!(
next_partial_ff.lt, 10,
"LT must remain the same with partial 0xFF"
);
assert_eq!(
next_partial_ff.hash.0, expected_hash_partial,
"Hash should be incremented correctly with carry"
);

// 3) Check increment when hash is fully 0xFF
let key_full_ff = QueueKey {
lt: 999,
hash: HashBytes([0xFF; 32]),
};
let next_full_ff = key_full_ff.next_value();
// Expect that hash resets to zero and LT increments by 1
assert_eq!(
next_full_ff.lt, 1000,
"LT must increment if hash was all 0xFF"
);
assert_eq!(next_full_ff.hash.0, [0u8; 32], "Hash should reset to zero");

// 4) A quick check of mid-range increment with carry:
// Example: [.., 0x01, 0xFF, 0xFF]
let mut mid_hash = [0u8; 32];
mid_hash[29] = 0x01;
mid_hash[30] = 0xFF;
mid_hash[31] = 0xFF;
let key_mid = QueueKey {
lt: 50,
hash: HashBytes(mid_hash),
};
let next_mid = key_mid.next_value();
// We expect that byte 29 increments to 0x02 and the last two bytes become 0x00
let mut expected_mid_hash = [0u8; 32];
expected_mid_hash[29] = 0x02;
assert_eq!(
next_mid.lt, 50,
"LT should remain the same for a mid-range carry"
);
assert_eq!(
next_mid.hash.0, expected_mid_hash,
"Hash should increment the correct byte after partial 0xFF"
);
}
}
14 changes: 10 additions & 4 deletions collator/src/internal_queue/state/commited_state.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::Result;
use everscale_types::models::{IntAddr, ShardIdent};
use tycho_block_util::queue::QueuePartitionIdx;
use tycho_storage::model::ShardsInternalMessagesKey;
use tycho_storage::{InternalQueueSnapshot, Storage};
use tycho_util::metrics::HistogramGuard;
use tycho_util::FastHashMap;
Expand Down Expand Up @@ -106,14 +107,19 @@ impl<V: InternalMessageValue> CommittedState<V> for CommittedStateStdImpl {
partition: QueuePartitionIdx,
ranges: &[QueueShardRange],
) -> Result<Box<dyn StateIterator<V>>> {
let mut shard_iters_with_ranges = Vec::new();
let mut shards_iters = Vec::new();

for range in ranges {
let iter = snapshot.iter_messages_commited();
shard_iters_with_ranges.push((iter, range.clone()));
// exclude from key
let from_key = range.from.next_value();
let from = ShardsInternalMessagesKey::new(partition, range.shard_ident, from_key);
// include to key
let to_key = range.to.next_value();
let to = ShardsInternalMessagesKey::new(partition, range.shard_ident, to_key);
shards_iters.push((snapshot.iter_messages_commited(from, to), range.shard_ident));
}

let iterator = StateIteratorImpl::new(partition, shard_iters_with_ranges, receiver)?;
let iterator = StateIteratorImpl::new(shards_iters, receiver)?;
Ok(Box::new(iterator))
}

Expand Down
59 changes: 4 additions & 55 deletions collator/src/internal_queue/state/shard_iterator.rs
Original file line number Diff line number Diff line change
@@ -1,67 +1,25 @@
use anyhow::Result;
use everscale_types::models::ShardIdent;
use tycho_block_util::queue::{QueueKey, QueuePartitionIdx};
use tycho_storage::model::ShardsInternalMessagesKey;
use tycho_block_util::queue::QueueKey;
use tycho_storage::InternalQueueMessagesIter;

use crate::types::ShortAddr;

#[derive(Clone, Debug)]
struct Range {
from: ShardsInternalMessagesKey,
to: ShardsInternalMessagesKey,
}

impl Range {
pub fn contains(&self, key: &ShardsInternalMessagesKey) -> bool {
key > &self.from && key <= &self.to
}
}

impl From<(QueuePartitionIdx, ShardIdent, QueueKey, QueueKey)> for Range {
fn from(value: (QueuePartitionIdx, ShardIdent, QueueKey, QueueKey)) -> Self {
let (partition, shard_ident, from, to) = value;

let from = ShardsInternalMessagesKey::new(partition, shard_ident, from);
let to = ShardsInternalMessagesKey::new(partition, shard_ident, to);

Range { from, to }
}
}

pub enum IterResult<'a> {
Value(&'a [u8]),
Skip(Option<(ShardIdent, QueueKey)>),
}

pub struct ShardIterator {
range: Range,
receiver: ShardIdent,
iterator: InternalQueueMessagesIter,
}

impl ShardIterator {
pub fn new(
partition: QueuePartitionIdx,
shard_ident: ShardIdent,
from: QueueKey,
to: QueueKey,
receiver: ShardIdent,
mut iterator: InternalQueueMessagesIter,
) -> Self {
iterator.seek(&ShardsInternalMessagesKey::new(
partition,
shard_ident,
from,
));

let range = Range::from((partition, shard_ident, from, to));
pub fn new(receiver: ShardIdent, mut iterator: InternalQueueMessagesIter) -> Self {
iterator.seek_to_first();

Self {
range,
receiver,
iterator,
}
Self { receiver, iterator }
}

#[allow(clippy::should_implement_trait)]
Expand All @@ -70,15 +28,6 @@ impl ShardIterator {
return Ok(None);
};

// skip first key if it is equal to `from`
if msg.key == self.range.from {
return Ok(Some(IterResult::Skip(None)));
}

if !self.range.contains(&msg.key) {
return Ok(None);
}

let short_addr = ShortAddr::new(msg.workchain as i32, msg.prefix);

Ok(Some(if self.receiver.contains_address(&short_addr) {
Expand Down
20 changes: 6 additions & 14 deletions collator/src/internal_queue/state/state_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use std::sync::Arc;
use ahash::HashMapExt;
use anyhow::{bail, Context, Result};
use everscale_types::models::ShardIdent;
use tycho_block_util::queue::{QueueKey, QueuePartitionIdx};
use tycho_block_util::queue::QueueKey;
use tycho_storage::InternalQueueMessagesIter;
use tycho_util::FastHashMap;

use crate::internal_queue::state::shard_iterator::{IterResult, ShardIterator};
use crate::internal_queue::types::{InternalMessageValue, QueueShardRange};
use crate::internal_queue::types::InternalMessageValue;

pub struct ShardIteratorWithRange {
pub iter: InternalQueueMessagesIter,
Expand Down Expand Up @@ -86,21 +86,13 @@ pub struct StateIteratorImpl<V: InternalMessageValue> {

impl<V: InternalMessageValue> StateIteratorImpl<V> {
pub fn new(
partition: QueuePartitionIdx,
shard_iters_with_ranges: Vec<(InternalQueueMessagesIter, QueueShardRange)>,
shard_iters: Vec<(InternalQueueMessagesIter, ShardIdent)>,
receiver: ShardIdent,
) -> Result<Self> {
let mut iters = FastHashMap::with_capacity(shard_iters_with_ranges.len());
let mut iters = FastHashMap::with_capacity(shard_iters.len());

for (iter, range) in shard_iters_with_ranges {
let QueueShardRange {
shard_ident,
from,
to,
} = range;

let shard_iterator =
ShardIterator::new(partition, shard_ident, from, to, receiver, iter);
for (iter, shard_ident) in shard_iters {
let shard_iterator = ShardIterator::new(receiver, iter);

match iters.entry(shard_ident) {
Entry::Occupied(_) => {
Expand Down
17 changes: 13 additions & 4 deletions collator/src/internal_queue/state/uncommitted_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
use anyhow::Result;
use everscale_types::models::{IntAddr, ShardIdent};
use tycho_block_util::queue::{QueueKey, QueuePartitionIdx, RouterAddr};
use tycho_storage::model::{QueueRange, StatKey};
use tycho_storage::model::{QueueRange, ShardsInternalMessagesKey, StatKey};
use tycho_storage::{InternalQueueSnapshot, InternalQueueTransaction, Storage};
use tycho_util::metrics::HistogramGuard;
use tycho_util::{FastHashMap, FastHashSet};
Expand Down Expand Up @@ -120,13 +120,22 @@ impl<V: InternalMessageValue> UncommittedState<V> for UncommittedStateStdImpl {
partition: QueuePartitionIdx,
ranges: &[QueueShardRange],
) -> Result<Box<dyn StateIterator<V>>> {
let mut shard_iters_with_ranges = Vec::new();
let mut shards_iters = Vec::new();

for range in ranges {
shard_iters_with_ranges.push((snapshot.iter_messages_uncommited(), range.clone()));
let from_key = range.from.next_value();
// exclude from key
let from = ShardsInternalMessagesKey::new(partition, range.shard_ident, from_key);
// include to key
let to_key = range.to.next_value();
let to = ShardsInternalMessagesKey::new(partition, range.shard_ident, to_key);
shards_iters.push((
snapshot.iter_messages_uncommited(from, to),
range.shard_ident,
));
}

let iterator = StateIteratorImpl::new(partition, shard_iters_with_ranges, receiver)?;
let iterator = StateIteratorImpl::new(shards_iters, receiver)?;
Ok(Box::new(iterator))
}

Expand Down
8 changes: 4 additions & 4 deletions collator/tests/internal_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,11 +454,11 @@ async fn test_queue() -> anyhow::Result<()> {
let queue_range = QueueShardRange {
shard_ident: ShardIdent::new_full(0),
from: QueueKey {
lt: 0,
lt: 10000,
hash: HashBytes::default(),
},
to: QueueKey {
lt: 16000,
lt: 15500,
hash: HashBytes::default(),
},
};
Expand All @@ -472,7 +472,7 @@ async fn test_queue() -> anyhow::Result<()> {
while iterator_manager.next()?.is_some() {
read_count += 1;
}
assert_eq!(read_count, 15000);
assert_eq!(read_count, 5000);

let iterators = queue.iterator(
QueuePartitionIdx::default(),
Expand All @@ -485,7 +485,7 @@ async fn test_queue() -> anyhow::Result<()> {
read_count += 1;
}

assert_eq!(read_count, 1000);
assert_eq!(read_count, 500);

// check two diff iterator
let mut ranges = Vec::new();
Expand Down
2 changes: 1 addition & 1 deletion scripts/gen-dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1554,7 +1554,7 @@ def collator_core_operations_metrics() -> RowPanel:
),
create_heatmap_panel(
"tycho_do_collate_build_statistics_time",
"Build statistics",
"async Apply message queue diff: inc. Build statistics",
labels=['workchain=~"$workchain"'],
),
create_heatmap_panel(
Expand Down
Loading

0 comments on commit 2d19642

Please sign in to comment.