diff --git a/block-util/src/lib.rs b/block-util/src/lib.rs index 4e003ab75..0ea3f82b7 100644 --- a/block-util/src/lib.rs +++ b/block-util/src/lib.rs @@ -4,5 +4,4 @@ pub mod config; pub mod dict; pub mod queue; pub mod state; - pub mod tl; diff --git a/block-util/src/queue/proto.rs b/block-util/src/queue/proto.rs index b2fe64d5b..fc2fa022a 100644 --- a/block-util/src/queue/proto.rs +++ b/block-util/src/queue/proto.rs @@ -22,7 +22,8 @@ pub struct QueueDiff { /// Seqno of the corresponding block. pub seqno: u32, /// collator boundaries. - pub processed_upto: BTreeMap, + // TODO: should rename field in `proto.tl` on network reset + pub processed_to: BTreeMap, /// Min message queue key. pub min_message: QueueKey, /// Max message queue key. @@ -55,7 +56,7 @@ impl TlWrite for QueueDiff { 4 + tl::hash_bytes::SIZE_HINT + tl::shard_ident::SIZE_HINT + 4 - + processed_upto_map::size_hint(&self.processed_upto) + + processed_to_map::size_hint(&self.processed_to) + 2 * QueueKey::SIZE_HINT + messages_list::size_hint(&self.messages) } @@ -68,7 +69,7 @@ impl TlWrite for QueueDiff { tl::hash_bytes::write(&self.prev_hash, packet); tl::shard_ident::write(&self.shard_ident, packet); packet.write_u32(self.seqno); - processed_upto_map::write(&self.processed_upto, packet); + processed_to_map::write(&self.processed_to, packet); self.min_message.write_to(packet); self.max_message.write_to(packet); messages_list::write(&self.messages, packet); @@ -90,7 +91,7 @@ impl<'tl> TlRead<'tl> for QueueDiff { prev_hash: tl::hash_bytes::read(data, offset)?, shard_ident: tl::shard_ident::read(data, offset)?, seqno: u32::read_from(data, offset)?, - processed_upto: processed_upto_map::read(data, offset)?, + processed_to: processed_to_map::read(data, offset)?, min_message: QueueKey::read_from(data, offset)?, max_message: QueueKey::read_from(data, offset)?, messages: messages_list::read(data, offset)?, @@ -205,7 +206,7 @@ impl std::fmt::Display for QueueKey { } } -mod processed_upto_map { +mod processed_to_map { use tl_proto::{TlPacket, TlResult}; use super::*; @@ -445,7 +446,7 @@ mod tests { prev_hash: HashBytes::from([0x33; 32]), shard_ident: ShardIdent::MASTERCHAIN, seqno: 123, - processed_upto: BTreeMap::from([ + processed_to: BTreeMap::from([ (ShardIdent::MASTERCHAIN, QueueKey { lt: 1, hash: HashBytes::from([0x11; 32]), @@ -491,7 +492,7 @@ mod tests { prev_hash, shard_ident: ShardIdent::MASTERCHAIN, seqno, - processed_upto: BTreeMap::from([ + processed_to: BTreeMap::from([ (ShardIdent::MASTERCHAIN, QueueKey { lt: 10 * seqno as u64, hash: HashBytes::from([seqno as u8; 32]), diff --git a/block-util/src/queue/queue_diff.rs b/block-util/src/queue/queue_diff.rs index a9968a542..e73cb56da 100644 --- a/block-util/src/queue/queue_diff.rs +++ b/block-util/src/queue/queue_diff.rs @@ -31,7 +31,7 @@ impl QueueDiffStuffBuilder { where I: IntoIterator, { - self.inner_mut().diff.processed_upto = processed_to + self.inner_mut().diff.processed_to = processed_to .into_iter() .map(|(shard_ident, lt, hash)| (shard_ident, QueueKey { lt, hash: *hash })) .collect(); @@ -79,6 +79,14 @@ impl SerializedQueueDiff { &self.inner.diff.hash } + pub fn processed_to(&self) -> impl Iterator { + self.inner + .diff + .processed_to + .iter() + .map(|(shard_ident, key)| (*shard_ident, key)) + } + fn inner_mut(&mut self) -> &mut Inner { Arc::get_mut(&mut self.inner).expect("inner is not shared") } @@ -102,7 +110,7 @@ impl QueueDiffStuff { prev_hash: HashBytes::ZERO, shard_ident: block_id.shard, seqno: block_id.seqno, - processed_upto: BTreeMap::from([(block_id.shard, QueueKey::MIN)]), + processed_to: BTreeMap::from([(block_id.shard, QueueKey::MIN)]), min_message: QueueKey::MIN, max_message: QueueKey::MIN, messages: Vec::new(), @@ -134,7 +142,7 @@ impl QueueDiffStuff { prev_hash: *prev_hash, shard_ident, seqno, - processed_upto: Default::default(), + processed_to: Default::default(), min_message: Default::default(), max_message: Default::default(), messages: Default::default(), @@ -361,7 +369,7 @@ mod tests { prev_hash: HashBytes::ZERO, shard_ident: ShardIdent::BASECHAIN, seqno: 1, - processed_upto: Default::default(), + processed_to: Default::default(), min_message: QueueKey { lt: 0, hash: message_hashes[0], diff --git a/collator/src/collator/do_collate/finalize.rs b/collator/src/collator/do_collate/finalize.rs index 12541658f..13168b429 100644 --- a/collator/src/collator/do_collate/finalize.rs +++ b/collator/src/collator/do_collate/finalize.rs @@ -175,17 +175,7 @@ impl Phase { .finalize .clone(); - let processed_to = self - .state - .collation_data - .processed_upto - .internals - .iter() - .map(|(shard_ident, processed_upto_stuff)| { - (*shard_ident, processed_upto_stuff.processed_to_msg) - }) - .collect(); - + let processed_to = queue_diff.processed_to().map(|(k, v)| (k, *v)).collect(); let shard = self.state.collation_data.block_id_short.shard; let labels = &[("workchain", shard.workchain().to_string())]; @@ -595,11 +585,7 @@ impl Phase { let mut shards_processed_to = FastHashMap::default(); - for (shard_id, shard_data) in shards.iter() { - if !shard_data.top_sc_block_updated { - continue; - } - + for (shard_id, _) in shards.iter() { // Extract processed information for updated shards let shard_processed_to = self .state @@ -653,7 +639,13 @@ impl Phase { block: new_block, is_key_block: new_block_info.key_block, prev_blocks_ids: self.state.prev_shard_data.blocks_ids().clone(), - top_shard_blocks: self.state.collation_data.top_shard_blocks.clone(), + top_shard_blocks_ids: self + .state + .collation_data + .top_shard_blocks + .iter() + .map(|b| b.block_id) + .collect(), collated_data, collated_file_hash: HashBytes::ZERO, chain_time: self.state.collation_data.get_gen_chain_time(), diff --git a/collator/src/collator/do_collate/mod.rs b/collator/src/collator/do_collate/mod.rs index 0603bf5f3..866494de4 100644 --- a/collator/src/collator/do_collate/mod.rs +++ b/collator/src/collator/do_collate/mod.rs @@ -33,7 +33,8 @@ use crate::queue_adapter::MessageQueueAdapter; use crate::tracing_targets; use crate::types::{ BlockCollationResult, BlockIdExt, CollationSessionInfo, CollatorConfig, - DisplayBlockIdsIntoIter, DisplayBlockIdsIter, McData, TopBlockDescription, TopShardBlockInfo, + DisplayBlockIdsIntoIter, DisplayBlockIdsIter, McData, ProcessedTo, ShardDescriptionShort, + TopBlockDescription, TopShardBlockInfo, }; #[cfg(test)] @@ -287,53 +288,21 @@ impl CollatorStdImpl { let finalize_block_timer = std::time::Instant::now(); - let mut min_processed_to: Option = None; - // Get current and masterchain processed values let current_processed_to = processed_to.get(&shard_id).cloned(); - let mc_processed_to = mc_data.processed_upto.internals.get(&shard_id).cloned(); - - if shard_id.is_masterchain() { - // Iterate through shards to find the minimum processed value from shards - for shard_processed_to in mc_data.shards_processed_to.values() { - if let Some(value) = shard_processed_to.get(&shard_id) { - min_processed_to = min_processed_to.map(|current_min| current_min.min(*value)); - } - } - - // Combine with current and masterchain values - min_processed_to = [current_processed_to, min_processed_to] - .into_iter() - .flatten() - .min(); - } else { - // Iterate through shards for non-masterchain - // replace for current shard processed upto from current collation - for (iter_shard_ident, shard_processed_to) in &mc_data.shards_processed_to { - let shard_processed_to = if iter_shard_ident == &shard_id { - current_processed_to - } else { - shard_processed_to.get(&shard_id).cloned() - }; - - // find minimum processed upto value - if let Some(value) = shard_processed_to { - min_processed_to = match min_processed_to { - Some(current_min) => Some(current_min.min(value)), - None => Some(value), - }; - } - } - - // Combine with masterchain processed value - min_processed_to = [ - min_processed_to, - mc_processed_to.map(|mc| mc.processed_to_msg), - ] - .into_iter() - .flatten() - .min(); - } + let mc_processed_to = mc_data + .processed_upto + .internals + .get(&shard_id) + .map(|mc| mc.processed_to_msg); + + let min_processed_to = calculate_min_processed_to( + &shard_id, + current_processed_to, + mc_processed_to, + &mc_data.shards, + &mc_data.shards_processed_to, + ); if let Some(value) = min_processed_to { mq_adapter.trim_diffs(&shard_id, &value)?; @@ -1369,3 +1338,71 @@ impl CollatorStdImpl { ); } } + +fn calculate_min_processed_to( + shard_id: &ShardIdent, + current_processed_to: Option, + mc_processed_to: Option, + mc_data_shards: &Vec<(ShardIdent, ShardDescriptionShort)>, + mc_data_shards_processed_to: &FastHashMap, +) -> Option { + fn find_min_processed_to( + shards: &Vec<(ShardIdent, ShardDescriptionShort)>, + shards_processed_to: &FastHashMap, + shard_id: &ShardIdent, + min_processed_to: &mut Option, + skip_condition: impl Fn(&ShardIdent) -> bool, + ) { + // Iterate through shards with updated top shard blocks and find min processed_to + for (shard, descr) in shards { + if skip_condition(shard) { + continue; + } + + if descr.top_sc_block_updated { + if let Some(value) = shards_processed_to.get(shard) { + if let Some(v) = value.get(shard_id) { + *min_processed_to = match *min_processed_to { + Some(current_min) => Some(current_min.min(*v)), + None => Some(*v), + }; + } + } + } + } + } + + let mut min_processed_to: Option = None; + + if shard_id.is_masterchain() { + find_min_processed_to( + mc_data_shards, + mc_data_shards_processed_to, + shard_id, + &mut min_processed_to, + |_| false, + ); + + // Combine with current and masterchain values + min_processed_to = [current_processed_to, min_processed_to] + .into_iter() + .flatten() + .min(); + } else { + find_min_processed_to( + mc_data_shards, + mc_data_shards_processed_to, + shard_id, + &mut min_processed_to, + |shard| shard == shard_id || shard.is_masterchain(), + ); + + // Combine with current and masterchain values and shard values + min_processed_to = [current_processed_to, min_processed_to, mc_processed_to] + .into_iter() + .flatten() + .min(); + } + + min_processed_to +} diff --git a/collator/src/collator/tests/do_collate_tests.rs b/collator/src/collator/tests/do_collate_tests.rs index e24d824c4..8e9778662 100644 --- a/collator/src/collator/tests/do_collate_tests.rs +++ b/collator/src/collator/tests/do_collate_tests.rs @@ -1,16 +1,19 @@ +use std::collections::BTreeMap; use std::sync::Arc; use everscale_types::models::*; use everscale_types::prelude::*; +use tycho_block_util::queue::QueueKey; +use tycho_util::FastHashMap; +use crate::collator::do_collate::calculate_min_processed_to; use crate::collator::types::{ BlockCollationData, BlockCollationDataBuilder, ParsedExternals, ReadNextExternalsMode, }; use crate::collator::{AnchorsCache, CollatorStdImpl}; use crate::mempool::make_stub_anchor; use crate::test_utils::try_init_test_tracing; -use crate::types::supported_capabilities; - +use crate::types::{supported_capabilities, ShardDescriptionShort}; pub(crate) fn fill_test_anchors_cache(anchors_cache: &mut AnchorsCache, shard_id: ShardIdent) { let mut prev_anchor_id = 0; for anchor_id in 1..=40 { @@ -296,3 +299,138 @@ fn test_read_next_externals() { let kv = anchors_cache.get(0); assert!(kv.is_none()); } + +#[test] +fn test_calculate_min_processed_to_masterchain() { + // Mock data for masterchain test + let shard_id = ShardIdent::MASTERCHAIN; + + let current_processed_to = Some(QueueKey::max_for_lt(5)); + let mc_processed_to = Some(QueueKey::max_for_lt(5)); + + let updated_shard = ShardIdent::new_full(0); + let not_updated_shard = ShardIdent::new_full(1); + + let mc_data_shards = vec![ + (updated_shard, ShardDescriptionShort { + top_sc_block_updated: true, + ..Default::default() + }), + (not_updated_shard, ShardDescriptionShort { + top_sc_block_updated: false, + ..Default::default() + }), + ]; + + let mut mc_data_shards_processed_to = FastHashMap::default(); + let mut processed_to = BTreeMap::new(); + processed_to.insert(shard_id, QueueKey::max_for_lt(4)); + + // check updated + mc_data_shards_processed_to.insert(updated_shard, processed_to); + + let result = calculate_min_processed_to( + &shard_id, + current_processed_to, + mc_processed_to, + &mc_data_shards, + &mc_data_shards_processed_to, + ); + + // updated shard should override current_processed_to + assert_eq!(result, Some(QueueKey::max_for_lt(4))); + + let mut mc_data_shards_processed_to = FastHashMap::default(); + let mut processed_to = BTreeMap::new(); + processed_to.insert(shard_id, QueueKey::max_for_lt(4)); + + // check updated + mc_data_shards_processed_to.insert(not_updated_shard, processed_to); + + let result = calculate_min_processed_to( + &shard_id, + current_processed_to, + mc_processed_to, + &mc_data_shards, + &mc_data_shards_processed_to, + ); + + // not updated shard should not override current_processed_to + assert_eq!(result, Some(QueueKey::max_for_lt(5))); +} + +#[test] +fn test_calculate_min_processed_to_shard() { + // Mock data for shard test + let shard_id = ShardIdent::new_full(2); + + let current_processed_to = Some(QueueKey::max_for_lt(10)); + + let updated_shard = ShardIdent::new_full(3); + let not_updated_shard = ShardIdent::new_full(4); + + let mc_data_shards = vec![ + (updated_shard, ShardDescriptionShort { + top_sc_block_updated: true, + ..Default::default() + }), + (not_updated_shard, ShardDescriptionShort { + top_sc_block_updated: false, + ..Default::default() + }), + ]; + + let mut mc_data_shards_processed_to = FastHashMap::default(); + let mut processed_to = BTreeMap::new(); + processed_to.insert(shard_id, QueueKey::max_for_lt(8)); + let mc_processed_to = Some(QueueKey::max_for_lt(9)); + + // Check updated shard + mc_data_shards_processed_to.insert(updated_shard, processed_to); + + let result = calculate_min_processed_to( + &shard_id, + current_processed_to, + mc_processed_to, + &mc_data_shards, + &mc_data_shards_processed_to, + ); + + // Updated shard should override current_processed_to + assert_eq!(result, Some(QueueKey::max_for_lt(8))); + + // Reset processed_to for not-updated shard + let mut mc_data_shards_processed_to = FastHashMap::default(); + let mut processed_to = BTreeMap::new(); + processed_to.insert(shard_id, QueueKey::max_for_lt(8)); + let mc_processed_to = Some(QueueKey::max_for_lt(11)); + // Check not updated shard + mc_data_shards_processed_to.insert(not_updated_shard, processed_to); + + let result = calculate_min_processed_to( + &shard_id, + current_processed_to, + mc_processed_to, + &mc_data_shards, + &mc_data_shards_processed_to, + ); + + // Not updated shard should not override current_processed_to + assert_eq!(result, Some(QueueKey::max_for_lt(10))); + + // Verify combination with masterchain value + let mc_data_shards_processed_to = FastHashMap::default(); + + let mc_processed_to = Some(QueueKey::max_for_lt(9)); + + let result = calculate_min_processed_to( + &shard_id, + current_processed_to, + mc_processed_to, + &mc_data_shards, + &mc_data_shards_processed_to, + ); + + // Minimum value should be returned + assert_eq!(result, Some(QueueKey::max_for_lt(9))); +} diff --git a/collator/src/collator/types.rs b/collator/src/collator/types.rs index 196750acf..1a7be07a9 100644 --- a/collator/src/collator/types.rs +++ b/collator/src/collator/types.rs @@ -22,7 +22,9 @@ use tycho_util::FastHashMap; use crate::mempool::{MempoolAnchor, MempoolAnchorId}; use crate::tracing_targets; -use crate::types::{BlockCandidate, McData, ProcessedUptoInfoStuff, ProofFunds, TopShardBlockInfo}; +use crate::types::{ + BlockCandidate, McData, ProcessedTo, ProcessedUptoInfoStuff, ProofFunds, TopShardBlockInfo, +}; pub(super) struct WorkingState { pub next_block_id_short: BlockIdShort, @@ -1279,7 +1281,7 @@ pub struct UpdateQueueDiffResult { pub has_unprocessed_messages: bool, pub diff_messages_len: usize, pub create_queue_diff_elapsed: Duration, - pub processed_to: BTreeMap, + pub processed_to: ProcessedTo, } pub struct FinalizedCollationResult { diff --git a/collator/src/internal_queue/queue.rs b/collator/src/internal_queue/queue.rs index efebc2a25..53549c03f 100644 --- a/collator/src/internal_queue/queue.rs +++ b/collator/src/internal_queue/queue.rs @@ -19,7 +19,8 @@ use crate::internal_queue::state::uncommitted_state::{ UncommittedState, UncommittedStateFactory, UncommittedStateImplFactory, UncommittedStateStdImpl, }; use crate::internal_queue::types::{InternalMessageValue, QueueDiffWithMessages}; - +use crate::tracing_targets; +use crate::types::ProcessedTo; // FACTORY #[derive(Debug, Serialize, Deserialize)] @@ -118,7 +119,7 @@ impl QueueFactory for QueueFactoryStdImpl { } struct ShortQueueDiff { - pub processed_to: BTreeMap, + pub processed_to: ProcessedTo, pub end_key: QueueKey, pub hash: HashBytes, } @@ -287,7 +288,16 @@ where } fn clear_uncommitted_state(&self) -> Result<()> { + let diffs_before_clear: usize = + self.uncommitted_diffs.iter().map(|r| r.value().len()).sum(); self.uncommitted_diffs.clear(); + let diffs_after_clear: usize = self.uncommitted_diffs.iter().map(|r| r.value().len()).sum(); + tracing::info!( + target: tracing_targets::MQ, + diffs_before_clear, + diffs_after_clear, + "Cleared uncommitted diffs.", + ); self.uncommitted_state.truncate() } diff --git a/collator/src/internal_queue/types.rs b/collator/src/internal_queue/types.rs index 62d5d3679..db6ad2942 100644 --- a/collator/src/internal_queue/types.rs +++ b/collator/src/internal_queue/types.rs @@ -5,15 +5,16 @@ use std::sync::Arc; use anyhow::{bail, Context, Result}; use everscale_types::boc::Boc; use everscale_types::cell::{Cell, HashBytes, Load}; -use everscale_types::models::{IntAddr, IntMsgInfo, Message, MsgInfo, OutMsgDescr, ShardIdent}; +use everscale_types::models::{IntAddr, IntMsgInfo, Message, MsgInfo, OutMsgDescr}; use tycho_block_util::queue::{QueueDiff, QueueDiffStuff, QueueKey}; use super::state::state_iterator::MessageExt; +use crate::types::ProcessedTo; #[derive(Default, Debug, Clone)] pub struct QueueDiffWithMessages { pub messages: BTreeMap>, - pub processed_to: BTreeMap, + pub processed_to: ProcessedTo, } impl QueueDiffWithMessages { @@ -30,10 +31,7 @@ impl QueueDiffWithMessages { queue_diff_stuff: &QueueDiffStuff, out_msg_description: &OutMsgDescr, ) -> Result { - let QueueDiff { - processed_upto: processed_to, - .. - } = queue_diff_stuff.as_ref(); + let QueueDiff { processed_to, .. } = queue_diff_stuff.as_ref(); let processed_to = processed_to .iter() .map(|(shard_ident, key)| (*shard_ident, *key)) diff --git a/collator/src/manager/blocks_cache.rs b/collator/src/manager/blocks_cache.rs index bf7c91798..b8073711e 100644 --- a/collator/src/manager/blocks_cache.rs +++ b/collator/src/manager/blocks_cache.rs @@ -6,7 +6,7 @@ use everscale_types::models::{ BlockId, BlockIdShort, ConsensusInfo, Lazy, OutMsgDescr, ShardIdent, ValueFlow, }; use parking_lot::Mutex; -use tycho_block_util::queue::{QueueDiffStuff, QueueKey}; +use tycho_block_util::queue::QueueDiffStuff; use tycho_block_util::state::ShardStateStuff; use tycho_util::{FastDashMap, FastHashMap}; @@ -18,7 +18,8 @@ use crate::manager::types::{AdditionalShardBlockCacheInfo, BlockCacheEntryData}; use crate::state_node::StateNodeAdapter; use crate::tracing_targets; use crate::types::{ - BlockCandidate, DisplayIntoIter, DisplayIter, McData, ProofFunds, TopBlockDescription, + BlockCandidate, DisplayIntoIter, DisplayIter, McData, ProcessedTo, ProofFunds, + TopBlockDescription, }; use crate::validator::ValidationStatus; @@ -139,7 +140,7 @@ impl BlocksCache { pub fn get_all_processed_to_by_mc_block_from_cache( &self, mc_block_key: &BlockCacheKey, - ) -> Result>>> { + ) -> Result>> { let mut all_processed_to = FastHashMap::default(); if mc_block_key.seqno == 0 { @@ -156,11 +157,7 @@ impl BlocksCache { ) }; - let processed_to = mc_block_entry - .int_processed_to() - .iter() - .map(|(shard, queue_key)| (*shard, *queue_key)) - .collect(); + let processed_to = mc_block_entry.int_processed_to().clone(); updated_top_shard_block_ids = mc_block_entry .top_shard_blocks_info @@ -177,7 +174,7 @@ impl BlocksCache { continue; } - let mut processed_to = FastHashMap::default(); + let mut processed_to = BTreeMap::default(); // try to find in cache if let Some(shard_cache) = self.shards.get(&top_sc_block_id.shard) { diff --git a/collator/src/manager/mod.rs b/collator/src/manager/mod.rs index b3c9bb2ea..bd3ac306e 100644 --- a/collator/src/manager/mod.rs +++ b/collator/src/manager/mod.rs @@ -12,7 +12,6 @@ use everscale_types::models::{ use parking_lot::{Mutex, RwLock}; use tokio::sync::Notify; use tycho_block_util::block::{calc_next_block_id_short, ValidatorSubsetInfo}; -use tycho_block_util::queue::QueueKey; use tycho_block_util::state::ShardStateStuff; use tycho_core::global_config::MempoolGlobalConfig; use tycho_util::metrics::HistogramGuard; @@ -40,7 +39,7 @@ use crate::state_node::{StateNodeAdapter, StateNodeAdapterFactory, StateNodeEven use crate::types::{ BlockCollationResult, BlockIdExt, CollationSessionId, CollationSessionInfo, CollatorConfig, DebugIter, DisplayAsShortId, DisplayBlockIdsIntoIter, DisplayIter, DisplayTuple, McData, - ShardDescriptionExt, ShardDescriptionShort, ShardHashesExt, + ProcessedTo, ShardDescriptionExt, ShardDescriptionShort, ShardHashesExt, }; use crate::utils::async_dispatcher::{AsyncDispatcher, STANDARD_ASYNC_DISPATCHER_BUFFER_SIZE}; use crate::utils::block::detect_top_processed_to_anchor; @@ -1228,7 +1227,7 @@ where // try load required previous queue diffs let mut prev_queue_diffs = vec![]; - for (shard_id, min_processed_to) in min_processed_to_by_shards { + for (shard_id, min_processed_to) in &min_processed_to_by_shards { let Some((_, prev_block_ids)) = before_tail_block_ids.get(shard_id) else { continue; }; @@ -1310,6 +1309,10 @@ where max_message_key, )?; } + // trim diffs tails for all shards + for (shard_id, min_processed_to) in min_processed_to_by_shards { + self.mq_adapter.trim_diffs(shard_id, min_processed_to)?; + } // sync all applied blocks // and refresh collation session by the last one @@ -1399,7 +1402,7 @@ where async fn read_min_processed_to_for_mc_block( &self, mc_block_key: &BlockCacheKey, - ) -> Result>> { + ) -> Result> { let mut result = FastHashMap::default(); if mc_block_key.seqno == 0 { @@ -1421,7 +1424,7 @@ where ) .await? .as_ref() - .processed_upto + .processed_to .clone() .into_iter() .collect() diff --git a/collator/src/manager/types.rs b/collator/src/manager/types.rs index 9d28273b7..4bc3cd63e 100644 --- a/collator/src/manager/types.rs +++ b/collator/src/manager/types.rs @@ -1,17 +1,17 @@ -use std::collections::BTreeMap; use std::fmt::{Debug, Display}; use std::sync::Arc; use anyhow::{anyhow, Result}; use everscale_types::models::{BlockId, BlockIdShort, BlockInfo, Lazy, OutMsgDescr, ShardIdent}; use tokio::sync::Notify; -use tycho_block_util::queue::{QueueDiffStuff, QueueKey}; +use tycho_block_util::queue::QueueDiffStuff; use tycho_block_util::state::ShardStateStuff; use tycho_network::PeerId; use tycho_util::FastHashMap; use crate::types::{ - ArcSignature, BlockCandidate, BlockStuffForSync, DebugDisplayOpt, McData, ShardDescriptionExt, + ArcSignature, BlockCandidate, BlockStuffForSync, DebugDisplayOpt, McData, ProcessedTo, + ShardDescriptionExt, }; pub(super) type BlockCacheKey = BlockIdShort; @@ -112,7 +112,7 @@ impl From for BlockStuffForSync { ref_by_mc_seqno, block: block_stuff_aug, prev_blocks_ids, - top_shard_blocks, + top_shard_blocks_ids, queue_diff_aug, consensus_info, .. @@ -125,7 +125,7 @@ impl From for BlockStuffForSync { signatures, total_signature_weight, prev_blocks_ids, - top_shard_blocks_ids: top_shard_blocks.into_iter().map(|i| i.block_id).collect(), + top_shard_blocks_ids, consensus_info, } } @@ -328,18 +328,12 @@ impl BlockCacheEntry { } } - pub fn int_processed_to(&self) -> &BTreeMap { + pub fn int_processed_to(&self) -> &ProcessedTo { match &self.data { BlockCacheEntryData::Collated { candidate_stuff, .. - } => { - &candidate_stuff - .candidate - .queue_diff_aug - .diff() - .processed_upto - } - BlockCacheEntryData::Received { queue_diff, .. } => &queue_diff.diff().processed_upto, + } => &candidate_stuff.candidate.queue_diff_aug.diff().processed_to, + BlockCacheEntryData::Received { queue_diff, .. } => &queue_diff.diff().processed_to, } } } diff --git a/collator/src/queue_adapter.rs b/collator/src/queue_adapter.rs index 67b62b19d..8b6e4d94f 100644 --- a/collator/src/queue_adapter.rs +++ b/collator/src/queue_adapter.rs @@ -109,13 +109,13 @@ impl MessageQueueAdapter for MessageQueueAdapterStdI ) -> Result<()> { let time = std::time::Instant::now(); let len = diff.messages.len(); - let processed_upto = diff.processed_to.clone(); + let processed_to = diff.processed_to.clone(); self.queue.apply_diff(diff, block_id_short, hash, end_key)?; tracing::info!(target: tracing_targets::MQ_ADAPTER, new_messages_len = len, elapsed = ?time.elapsed(), - processed_upto = %DisplayIter(processed_upto.iter().map(DisplayTuple)), + processed_to = %DisplayIter(processed_to.iter().map(DisplayTuple)), "Diff applied", ); Ok(()) diff --git a/collator/src/types.rs b/collator/src/types.rs index 5eeca26d4..166da2c5f 100644 --- a/collator/src/types.rs +++ b/collator/src/types.rs @@ -215,13 +215,13 @@ pub struct McData { pub ref_mc_state_handle: RefMcStateHandle, - pub shards_processed_to: FastHashMap>, + pub shards_processed_to: FastHashMap, } impl McData { pub fn load_from_state( state_stuff: &ShardStateStuff, - shards_processed_to: FastHashMap>, + shards_processed_to: FastHashMap, ) -> Result> { let block_id = *state_stuff.block_id(); let extra = state_stuff.state_extra()?; @@ -287,7 +287,7 @@ pub struct BlockCandidate { pub block: BlockStuffAug, pub is_key_block: bool, pub prev_blocks_ids: Vec, - pub top_shard_blocks: Vec, + pub top_shard_blocks_ids: Vec, pub collated_data: Vec, pub collated_file_hash: HashBytes, pub chain_time: u64, @@ -438,7 +438,7 @@ pub struct TopBlockDescription { pub proof_funds: ProofFunds, #[cfg(feature = "block-creator-stats")] pub creators: Vec, - pub processed_to: FastHashMap, + pub processed_to: ProcessedTo, } #[derive(Debug)] @@ -623,7 +623,7 @@ impl std::fmt::Display for Display } } -#[derive(Debug, Clone, Copy, Eq, PartialEq)] +#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)] pub struct ShardDescriptionShort { pub ext_processed_to_anchor_id: u32, pub top_sc_block_updated: bool, @@ -680,5 +680,7 @@ where #[derive(Debug, Clone)] pub struct TopShardBlockInfo { pub block_id: BlockId, - pub processed_to: FastHashMap, + pub processed_to: ProcessedTo, } + +pub type ProcessedTo = BTreeMap; diff --git a/collator/tests/internal_queue.rs b/collator/tests/internal_queue.rs index aa71814a1..5d4137b54 100644 --- a/collator/tests/internal_queue.rs +++ b/collator/tests/internal_queue.rs @@ -400,7 +400,7 @@ fn test_queue_diff_with_messages_from_queue_diff_stuff() -> anyhow::Result<()> { prev_hash: HashBytes::from([0x33; 32]), shard_ident: ShardIdent::MASTERCHAIN, seqno: 123, - processed_upto: BTreeMap::from([ + processed_to: BTreeMap::from([ (ShardIdent::MASTERCHAIN, QueueKey { lt: 1, hash: message1_hash, @@ -433,7 +433,7 @@ fn test_queue_diff_with_messages_from_queue_diff_stuff() -> anyhow::Result<()> { let diff_with_messages = QueueDiffWithMessages::from_queue_diff(&queue_diff_stuff, &out_msg)?; - assert_eq!(diff_with_messages.processed_to, diff.processed_upto,); + assert_eq!(diff_with_messages.processed_to, diff.processed_to,); assert_eq!( diff_with_messages