diff --git a/collator/src/collator/messages_buffer.rs b/collator/src/collator/messages_buffer.rs index e7b5a9326..ba0cf6b60 100644 --- a/collator/src/collator/messages_buffer.rs +++ b/collator/src/collator/messages_buffer.rs @@ -101,6 +101,17 @@ impl MessagesBuffer { } } + pub fn remove_messages_by_accounts(&mut self, addresses_to_remove: &FastHashSet) { + self.msgs.retain(|k, v| { + if addresses_to_remove.contains(k) { + self.int_count -= v.len(); + false + } else { + true + } + }); + } + /// Returns queue keys of collected internal queue messages. pub fn fill_message_group( &mut self, diff --git a/collator/src/collator/messages_reader/mod.rs b/collator/src/collator/messages_reader/mod.rs index 6cdedbe3e..63f3d9d02 100644 --- a/collator/src/collator/messages_reader/mod.rs +++ b/collator/src/collator/messages_reader/mod.rs @@ -3,8 +3,10 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{Context, Result}; +use everscale_types::cell::HashBytes; use everscale_types::models::{MsgsExecutionParams, ShardIdent}; use tycho_block_util::queue::{QueueKey, QueuePartitionIdx}; +use tycho_util::FastHashSet; use self::externals_reader::*; use self::internals_reader::*; @@ -20,7 +22,7 @@ use crate::internal_queue::types::{ use crate::queue_adapter::MessageQueueAdapter; use crate::tracing_targets; use crate::types::processed_upto::{BlockSeqno, Lt}; -use crate::types::DebugIter; +use crate::types::{DebugIter, IntAdrExt, ProcessedTo}; mod externals_reader; mod internals_reader; @@ -309,6 +311,21 @@ impl MessagesReader { } } + fn get_min_internals_processed_to_by_shards(&self) -> ProcessedTo { + let mut min_internals_processed_to = ProcessedTo::default(); + + for par_reader in self.internals_partition_readers.values() { + for (shard_id, key) in &par_reader.reader_state().processed_to { + min_internals_processed_to + .entry(*shard_id) + .and_modify(|min_key| *min_key = std::cmp::min(*min_key, *key)) + .or_insert(*key); + } + } + + min_internals_processed_to + } + pub fn finalize( mut self, current_next_lt: u64, @@ -324,7 +341,7 @@ impl MessagesReader { // collect internals partition readers states let mut internals_reader_state = InternalsReaderState::default(); - for (par_id, mut par_reader) in self.internals_partition_readers { + for (par_id, par_reader) in self.internals_partition_readers.iter_mut() { // collect aggregated messages stats for range_reader in par_reader.range_readers().values() { if range_reader.fully_read && range_reader.reader_state.buffer.msgs_count() == 0 { @@ -350,36 +367,21 @@ impl MessagesReader { .get_last_range_reader()? .1 .reader_state() - .get_state_by_partition(par_id)?; + .get_state_by_partition(*par_id)?; if last_int_range_reader.reader_state.skip_offset == last_ext_range_reader.skip_offset { par_reader.drop_processing_offset(true)?; - self.externals_reader.drop_processing_offset(par_id, true)?; + self.externals_reader + .drop_processing_offset(*par_id, true)?; } } } - - let par_reader_state = par_reader.finalize(current_next_lt)?; - internals_reader_state - .partitions - .insert(par_id, par_reader_state); } - // collect externals reader state - let FinalizedExternalsReader { - externals_reader_state, - anchors_cache, - } = self.externals_reader.finalize()?; - - let reader_state = ReaderState { - externals: externals_reader_state, - internals: internals_reader_state, - }; - // build queue diff - let min_internals_processed_to = reader_state.internals.get_min_processed_to_by_shards(); + let min_internals_processed_to = self.get_min_internals_processed_to_by_shards(); let mut queue_diff_with_msgs = self .new_messages .into_queue_diff_with_messages(min_internals_processed_to); @@ -390,7 +392,7 @@ impl MessagesReader { // reset queue diff partition router // according to actual aggregated stats - Self::reset_partition_router_by_stats( + let moved_from_par_0_accounts = Self::reset_partition_router_by_stats( &self.msgs_exec_params, &mut queue_diff_with_msgs.partition_router, aggregated_stats, @@ -398,6 +400,35 @@ impl MessagesReader { diffs, )?; + // remove moved accounts from partition 0 buffer + let par_reader = self.internals_partition_readers.get_mut(&0).unwrap(); + if let Ok(last_int_range_reader) = par_reader.get_last_range_reader_mut() { + if last_int_range_reader.kind == InternalsRangeReaderKind::NewMessages { + last_int_range_reader + .reader_state + .buffer + .remove_messages_by_accounts(&moved_from_par_0_accounts); + } + } + + // collect internals reader state + for (par_id, par_reader) in self.internals_partition_readers { + internals_reader_state + .partitions + .insert(par_id, par_reader.finalize(current_next_lt)?); + } + + // collect externals reader state + let FinalizedExternalsReader { + externals_reader_state, + anchors_cache, + } = self.externals_reader.finalize()?; + + let reader_state = ReaderState { + externals: externals_reader_state, + internals: internals_reader_state, + }; + Ok(FinalizedMessagesReader { has_unprocessed_messages, reader_state, @@ -412,8 +443,9 @@ impl MessagesReader { aggregated_stats: QueueStatistics, for_shard_id: ShardIdent, top_block_diffs: Vec<(ShardIdent, ShortQueueDiff)>, - ) -> Result<()> { + ) -> Result> { let par_0_msgs_count_limit = msgs_exec_params.par_0_int_msgs_count_limit as u64; + let mut moved_from_par_0_accounts = FastHashSet::default(); for (dest_int_address, msgs_count) in aggregated_stats { let existing_partition = partition_router.get_partition(None, &dest_int_address); @@ -435,6 +467,7 @@ impl MessagesReader { dest_int_address, msgs_count, ); partition_router.insert_dst(&dest_int_address, 1)?; + moved_from_par_0_accounts.insert(dest_int_address.get_address()); } } else { tracing::trace!(target: tracing_targets::COLLATOR, @@ -513,11 +546,12 @@ impl MessagesReader { dest_int_address, total_msgs, ); partition_router.insert_dst(&dest_int_address, 1)?; + moved_from_par_0_accounts.insert(dest_int_address.get_address()); } } } - Ok(()) + Ok(moved_from_par_0_accounts) } pub fn last_read_to_anchor_chain_time(&self) -> Option {