Skip to content

Commit

Permalink
feature(collator): improve routing
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick authored and SmaGMan committed Jan 15, 2025
1 parent b997127 commit 54db32d
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 30 deletions.
31 changes: 30 additions & 1 deletion collator/src/collator/do_collate/finalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,35 @@ impl Phase<FinalizeState> {
.cloned()
.unwrap_or_default();

let diffs = if self.state.collation_data.block_id_short.is_masterchain() {
let top_shard_blocks = if self.state.collation_data.block_id_short.is_masterchain() {
Some(
self.state
.collation_data
.top_shard_blocks
.iter()
.map(|b| (b.block_id.shard, b.block_id.seqno))
.collect(),
)
} else {
None
};

// load diffs by top shard blocks
mq_adapter.get_diffs(top_shard_blocks.clone().unwrap_or_default())
} else {
let blocks = self
.state
.mc_data
.shards
.iter()
.filter(|(_, d)| d.top_sc_block_updated)
.map(|(shard_ident, descr)| (*shard_ident, descr.seqno))
.collect();

mq_adapter.get_diffs(blocks)
};

// get queue diff and check for pending internals
let create_queue_diff_elapsed;
let FinalizedMessagesReader {
Expand All @@ -82,7 +111,7 @@ impl Phase<FinalizeState> {
&labels,
);
let finalize_message_reader_res =
messages_reader.finalize(self.extra.executor.min_next_lt())?;
messages_reader.finalize(self.extra.executor.min_next_lt(), diffs)?;
create_queue_diff_elapsed = histogram_create_queue_diff.finish();
finalize_message_reader_res
};
Expand Down
2 changes: 1 addition & 1 deletion collator/src/collator/do_collate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ impl CollatorStdImpl {
if let Some(value) = min_processed_to {
mq_adapter.trim_diffs(&shard_id, &value)?;
};
let diff_tail_len = mq_adapter.get_diff_count_by_shard(&shard_id) as u32 + 1;
let diff_tail_len = mq_adapter.get_diffs_count_by_shard(&shard_id) as u32 + 1;

let span = tracing::Span::current();
let (finalize_phase_result, update_queue_task_result) = rayon::join(
Expand Down
74 changes: 64 additions & 10 deletions collator/src/collator/messages_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ use new_messages::*;
pub(super) use reader_state::*;
use tycho_block_util::queue::RouterDirection;

use crate::internal_queue::queue::ShortQueueDiff;

pub(super) struct FinalizedMessagesReader {
pub has_unprocessed_messages: bool,
pub reader_state: ReaderState,
Expand Down Expand Up @@ -266,7 +268,11 @@ impl MessagesReader {
}
}

pub fn finalize(mut self, current_next_lt: u64) -> Result<FinalizedMessagesReader> {
pub fn finalize(
mut self,
current_next_lt: u64,
diffs: Vec<(ShardIdent, ShortQueueDiff)>,
) -> Result<FinalizedMessagesReader> {
let mut has_unprocessed_messages = self.has_messages_in_buffers()
|| self.has_pending_new_messages()
|| self.has_pending_externals_in_cache();
Expand Down Expand Up @@ -342,9 +348,11 @@ impl MessagesReader {

// reset queue diff partition router
// according to actual aggregated stats
Self::reset_partition_rounter_by_stats(
Self::reset_partition_router_by_stats(
&mut queue_diff_with_msgs.partition_router,
aggregated_stats,
self.for_shard_id,
diffs,
);

Ok(FinalizedMessagesReader {
Expand All @@ -355,19 +363,65 @@ impl MessagesReader {
})
}

pub fn reset_partition_rounter_by_stats(
pub fn reset_partition_router_by_stats(
partition_router: &mut PartitionRouter,
stats: QueueStatistics,
aggregated_stats: QueueStatistics,
for_shard_id: ShardIdent,
top_block_diffs: Vec<(ShardIdent, ShortQueueDiff)>,
) {
// TODO: msgs-v3: store limit in msgs_exec_params
const MAX_PAR_0_MSGS_COUNT_LIMIT: u64 = 100_000;

partition_router.clear();
for (account_addr, msgs_count) in stats {
if msgs_count > MAX_PAR_0_MSGS_COUNT_LIMIT {
partition_router
.insert(RouterDirection::Dest, account_addr, 1)
.unwrap();
for (int_address, msgs_count) in aggregated_stats {
let int_address_bytes = int_address.as_std().unwrap().address;
let acc_for_current_shard = for_shard_id.contains_account(&int_address_bytes);

let existing_partition = partition_router.get_partition(None, &int_address);
if existing_partition != 0 {
continue;
}

if acc_for_current_shard {
if msgs_count > MAX_PAR_0_MSGS_COUNT_LIMIT {
partition_router
.insert(RouterDirection::Dest, int_address, 1)
.unwrap();
}
} else {
// if we have account for another shard then take info from that shard
let acc_shard_diff_info = top_block_diffs
.iter()
.find(|(shard_id, _)| shard_id.contains_account(&int_address_bytes))
.map(|(_, diff)| diff);

if let Some(diff) = acc_shard_diff_info {
// if we found low priority partition in remote diff then copy it
let remote_shard_partition = diff.router.get_partition(None, &int_address);
if remote_shard_partition != 0 {
partition_router
.insert(RouterDirection::Dest, int_address, remote_shard_partition)
.unwrap();
continue;
}

// if remote partition == 0 then we need to check statistics
let remote_msgs_count = match diff.statistics.partition(0) {
None => 0,
Some(partition) => partition.get(&int_address).copied().unwrap_or(0),
};

let total = msgs_count + remote_msgs_count;

if total > MAX_PAR_0_MSGS_COUNT_LIMIT {
partition_router
.insert(RouterDirection::Dest, int_address, 1)
.unwrap();
}
} else if msgs_count > MAX_PAR_0_MSGS_COUNT_LIMIT {
partition_router
.insert(RouterDirection::Dest, int_address, 1)
.unwrap();
}
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions collator/src/collator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1307,9 +1307,7 @@ impl CollatorStdImpl {
// return reader state to working state
let FinalizedMessagesReader {
mut reader_state, ..
} = messages_reader.finalize(
0, // can pass 0 because new messages reader was not initialized in this case
)?;
} = messages_reader.finalize(0, vec![])?;
std::mem::swap(&mut working_state.reader_state, &mut reader_state);

working_state.has_unprocessed_messages = Some(has_pending_internals);
Expand Down
7 changes: 6 additions & 1 deletion collator/src/collator/tests/execution_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use super::{
GetNextMessageGroupContext, GetNextMessageGroupMode, InitIteratorMode, MessagesReader,
};
use crate::internal_queue::iterator::{IterItem, QueueIterator};
use crate::internal_queue::queue::ShortQueueDiff;
use crate::internal_queue::types::{
DiffStatistics, EnqueuedMessage, InternalMessageValue, QueueDiffWithMessages, QueueFullDiff,
QueueRange, QueueShardRange, QueueStatistics,
Expand Down Expand Up @@ -140,7 +141,11 @@ impl<V: InternalMessageValue + Default> MessageQueueAdapter<V> for MessageQueueA
unimplemented!()
}

fn get_diff_count_by_shard(&self, _shard_ident: &ShardIdent) -> usize {
fn get_diffs(&self, blocks: FastHashMap<ShardIdent, u32>) -> Vec<(ShardIdent, ShortQueueDiff)> {
todo!()
}

fn get_diffs_count_by_shard(&self, _shard_ident: &ShardIdent) -> usize {
unimplemented!()
}
}
Expand Down
47 changes: 40 additions & 7 deletions collator/src/internal_queue/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use crate::internal_queue::state::uncommitted_state::{
UncommittedState, UncommittedStateFactory, UncommittedStateImplFactory, UncommittedStateStdImpl,
};
use crate::internal_queue::types::{
DiffStatistics, InternalMessageValue, QueueDiffWithMessages, QueueShardRange, QueueStatistics,
DiffStatistics, InternalMessageValue, PartitionRouter, QueueDiffWithMessages, QueueShardRange,
QueueStatistics,
};
use crate::tracing_targets;
use crate::types::ProcessedTo;
Expand Down Expand Up @@ -102,6 +103,8 @@ where
partition: QueuePartition,
ranges: Vec<QueueShardRange>,
) -> Result<QueueStatistics>;

fn get_diffs(&self, blocks: FastHashMap<ShardIdent, u32>) -> Vec<(ShardIdent, ShortQueueDiff)>;
}

// IMPLEMENTATION
Expand Down Expand Up @@ -129,12 +132,13 @@ impl<V: InternalMessageValue> QueueFactory<V> for QueueFactoryStdImpl {
}
}

#[derive(Debug)]
struct ShortQueueDiff {
#[derive(Debug, Clone)]
pub struct ShortQueueDiff {
pub processed_to: ProcessedTo,
pub max_message: QueueKey,
pub partitions: FastHashSet<QueuePartition>,
pub router: PartitionRouter,
pub hash: HashBytes,
pub statistics: DiffStatistics,
}

pub struct QueueImpl<S, P, V>
Expand Down Expand Up @@ -225,15 +229,16 @@ where
block_id_short.shard,
&diff.partition_router,
&diff.messages,
statistics,
&statistics,
)?;
}

let short_diff = ShortQueueDiff {
processed_to: diff.processed_to,
max_message,
hash: *hash,
partitions: diff.partition_router.partitions().clone(),
router: diff.partition_router,
statistics,
};

// Insert the diff into the shard diffs
Expand Down Expand Up @@ -265,7 +270,7 @@ where
.entry(block_id_short.shard)
.or_insert_with(|| shard_diff.max_message);

for partition in &shard_diff.partitions {
for partition in shard_diff.router.partitions() {
partitions.insert(*partition);
}

Expand Down Expand Up @@ -386,4 +391,32 @@ where

Ok(statistics)
}

fn get_diffs(&self, blocks: FastHashMap<ShardIdent, u32>) -> Vec<(ShardIdent, ShortQueueDiff)> {
blocks
.into_iter()
.flat_map(|(shard_ident, seqno)| {
let mut diffs = vec![];
if let Some(shard_diffs) = self.uncommitted_diffs.get(&shard_ident) {
shard_diffs
.value()
.iter()
.filter(|(block_seqno, _)| **block_seqno <= seqno)
.for_each(|(_, diff)| {
diffs.push((shard_ident, diff.clone()));
});
}
if let Some(shard_diffs) = self.committed_diffs.get(&shard_ident) {
shard_diffs
.value()
.iter()
.filter(|(block_seqno, _)| **block_seqno <= seqno)
.for_each(|(_, diff)| {
diffs.push((shard_ident, diff.clone()));
});
}
diffs
})
.collect()
}
}
6 changes: 3 additions & 3 deletions collator/src/internal_queue/state/uncommitted_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub trait LocalUncommittedState<V: InternalMessageValue> {
source: ShardIdent,
partition_router: &PartitionRouter,
messages: &BTreeMap<QueueKey, Arc<V>>,
statistics: DiffStatistics,
statistics: &DiffStatistics,
) -> Result<()>;

fn load_statistics(
Expand Down Expand Up @@ -161,7 +161,7 @@ impl<V: InternalMessageValue> UncommittedState<V> for UncommittedStateStdImpl {
source: ShardIdent,
partition_router: &PartitionRouter,
messages: &BTreeMap<QueueKey, Arc<V>>,
statistics: DiffStatistics,
statistics: &DiffStatistics,
) -> Result<()> {
let mut batch = WriteBatch::default();

Expand Down Expand Up @@ -231,7 +231,7 @@ impl UncommittedStateStdImpl {
fn add_statistics(
&self,
batch: &mut WriteBatch,
diff_statistics: DiffStatistics,
diff_statistics: &DiffStatistics,
) -> Result<()> {
let shard_ident = diff_statistics.shard_ident();
let min_message = diff_statistics.min_message();
Expand Down
7 changes: 6 additions & 1 deletion collator/src/internal_queue/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ impl IntoIterator for QueueStatistics {
}
}

#[derive(Debug, Clone)]
pub struct DiffStatistics {
inner: Arc<DiffStatisticsInner>,
}
Expand All @@ -371,8 +372,12 @@ impl DiffStatistics {
pub fn max_message(&self) -> &QueueKey {
&self.inner.max_message
}
}

pub fn partition(&self, partition: QueuePartition) -> Option<&FastHashMap<IntAddr, u64>> {
self.inner.statistics.get(&partition)
}
}
#[derive(Debug, Clone)]
struct DiffStatisticsInner {
shard_ident: ShardIdent,
min_message: QueueKey,
Expand Down
1 change: 1 addition & 0 deletions collator/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1288,6 +1288,7 @@ where

let queue_diff_with_messages =
QueueDiffWithMessages::from_queue_diff(&queue_diff_stuff, &out_msgs)?;

prev_queue_diffs.push((
queue_diff_with_messages,
*queue_diff_stuff.diff_hash(),
Expand Down
12 changes: 9 additions & 3 deletions collator/src/queue_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use everscale_types::cell::HashBytes;
use everscale_types::models::{BlockIdShort, ShardIdent};
use tracing::instrument;
use tycho_block_util::queue::{QueueKey, QueuePartition};
use tycho_util::FastHashMap;

use crate::internal_queue::iterator::{QueueIterator, QueueIteratorImpl};
use crate::internal_queue::queue::{Queue, QueueImpl};
use crate::internal_queue::queue::{Queue, QueueImpl, ShortQueueDiff};
use crate::internal_queue::state::commited_state::CommittedStateStdImpl;
use crate::internal_queue::state::states_iterators_manager::StatesIteratorsManager;
use crate::internal_queue::state::uncommitted_state::UncommittedStateStdImpl;
Expand Down Expand Up @@ -70,9 +71,10 @@ where
fn clear_uncommitted_state(&self) -> Result<()>;
/// removes all diffs from the cache that are less than `inclusive_until` which source shard is `source_shard`
fn trim_diffs(&self, source_shard: &ShardIdent, inclusive_until: &QueueKey) -> Result<()>;
fn get_diffs(&self, blocks: FastHashMap<ShardIdent, u32>) -> Vec<(ShardIdent, ShortQueueDiff)>;

/// returns the number of diffs in cache for the given shard
fn get_diff_count_by_shard(&self, shard_ident: &ShardIdent) -> usize;
fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize;
}

impl<V: InternalMessageValue> MessageQueueAdapterStdImpl<V> {
Expand Down Expand Up @@ -197,7 +199,11 @@ impl<V: InternalMessageValue> MessageQueueAdapter<V> for MessageQueueAdapterStdI
self.queue.trim_diffs(source_shard, inclusive_until)
}

fn get_diff_count_by_shard(&self, shard_ident: &ShardIdent) -> usize {
fn get_diffs(&self, blocks: FastHashMap<ShardIdent, u32>) -> Vec<(ShardIdent, ShortQueueDiff)> {
self.queue.get_diffs(blocks)
}

fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize {
self.queue.get_diffs_count_by_shard(shard_ident)
}
}

0 comments on commit 54db32d

Please sign in to comment.