From a88861258642f622019324374fae11f67336c81f Mon Sep 17 00:00:00 2001 From: Maksim Greshniakov Date: Fri, 29 Nov 2024 18:41:23 +0100 Subject: [PATCH] refactor(collator): renaming and refactoring --- collator/src/collator/do_collate/finalize.rs | 43 +++++++------- collator/src/collator/do_collate/mod.rs | 58 +++++++++---------- collator/src/collator/tests/collator_tests.rs | 2 +- .../collator/tests/execution_manager_tests.rs | 2 +- collator/src/collator/types.rs | 2 +- collator/src/internal_queue/iterator.rs | 4 +- collator/src/internal_queue/queue.rs | 18 +++--- .../src/internal_queue/state/session_state.rs | 5 +- collator/src/internal_queue/types.rs | 8 +-- collator/src/manager/blocks_cache.rs | 15 ++--- collator/src/manager/mod.rs | 19 +++--- collator/src/queue_adapter.rs | 2 +- collator/src/types.rs | 10 ++-- collator/tests/internal_queue.rs | 2 +- 14 files changed, 89 insertions(+), 101 deletions(-) diff --git a/collator/src/collator/do_collate/finalize.rs b/collator/src/collator/do_collate/finalize.rs index 5c2dc17d7..a3ae29c51 100644 --- a/collator/src/collator/do_collate/finalize.rs +++ b/collator/src/collator/do_collate/finalize.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, HashMap}; +use std::collections::BTreeMap; use std::sync::Arc; use std::time::Duration; @@ -98,7 +98,7 @@ impl Phase { ) .with_processed_upto( diff_with_messages - .processed_upto + .processed_to .iter() .map(|(k, v)| (*k, v.lt, &v.hash)), ) @@ -109,7 +109,7 @@ impl Phase { ) .serialize(); - let processed_upto = diff_with_messages.processed_upto.clone(); + let diff_processed_to = diff_with_messages.processed_to.clone(); let queue_diff_hash = *queue_diff.hash(); tracing::debug!(target: tracing_targets::COLLATOR, queue_diff_hash = %queue_diff_hash); @@ -146,7 +146,7 @@ impl Phase { has_unprocessed_messages, diff_messages_len, create_queue_diff_elapsed, - processed_upto, + processed_to: diff_processed_to, }, update_queue_task, )) @@ -175,13 +175,16 @@ impl Phase { .finalize .clone(); - let mut processed_upto = FastHashMap::default(); - - for (shard_ident, processed_upto_stuff) in - &self.state.collation_data.processed_upto.internals - { - processed_upto.insert(*shard_ident, processed_upto_stuff.processed_to_msg); - } + let processed_to = self + .state + .collation_data + .processed_upto + .internals + .iter() + .map(|(shard_ident, processed_upto_stuff)| { + (*shard_ident, processed_upto_stuff.processed_to_msg) + }) + .collect(); let shard = self.state.collation_data.block_id_short.shard; @@ -590,7 +593,7 @@ impl Phase { self.state.collation_data.processed_upto.externals.as_ref(), ); - let mut shards_processed_upto = HashMap::default(); + let mut shards_processed_to = FastHashMap::default(); for (shard_id, shard_data) in shards.iter() { if !shard_data.top_sc_block_updated { @@ -598,25 +601,23 @@ impl Phase { } // Extract processed information for updated shards - let processed_upto = self + let shard_processed_to = self .state .collation_data .top_shard_blocks .iter() .find(|top_block_info| top_block_info.block_id.shard == *shard_id) - .map(|top_block_info| { - FastHashMap::from(top_block_info.processed_upto.clone()) - }) + .map(|top_block_info| top_block_info.processed_to.clone()) .or_else(|| { self.state .mc_data - .shards_processed_upto + .shards_processed_to .get(shard_id) .cloned() }); - if let Some(processed_upto) = processed_upto { - shards_processed_upto.insert(*shard_id, processed_upto); + if let Some(value) = shard_processed_to { + shards_processed_to.insert(*shard_id, value); } } @@ -639,7 +640,7 @@ impl Phase { processed_upto: self.state.collation_data.processed_upto.clone(), top_processed_to_anchor, ref_mc_state_handle: self.state.prev_shard_data.ref_mc_state_handle().clone(), - shards_processed_upto, + shards_processed_to, })) } }; @@ -671,7 +672,7 @@ impl Phase { || self.state.mc_data.consensus_info, |mcd| mcd.consensus_info, ), - processed_upto, + processed_to, }); let total_elapsed = histogram.finish(); diff --git a/collator/src/collator/do_collate/mod.rs b/collator/src/collator/do_collate/mod.rs index 45e2795e8..0603bf5f3 100644 --- a/collator/src/collator/do_collate/mod.rs +++ b/collator/src/collator/do_collate/mod.rs @@ -274,73 +274,71 @@ impl CollatorStdImpl { let (executor, mq_iterator_adapter, mq_adapter) = execution_wrapper.destruct(); - // state.mc_data.shards.first().unwrap().pr - let ( UpdateQueueDiffResult { queue_diff, has_unprocessed_messages, diff_messages_len, create_queue_diff_elapsed, - processed_upto, + processed_to, }, update_queue_task, ) = finalize_phase.update_queue_diff(mq_iterator_adapter, shard_id, mq_adapter.clone())?; let finalize_block_timer = std::time::Instant::now(); - let mut min_processed_upto: Option = None; + let mut min_processed_to: Option = None; // Get current and masterchain processed values - let current_processed_upto = processed_upto.get(&shard_id).cloned(); - let mc_processed_upto = mc_data.processed_upto.internals.get(&shard_id).cloned(); + let current_processed_to = processed_to.get(&shard_id).cloned(); + let mc_processed_to = mc_data.processed_upto.internals.get(&shard_id).cloned(); if shard_id.is_masterchain() { - // Iterate through shards to find the minimum processed value - for shard_processed_upto in mc_data.shards_processed_upto.values() { - if let Some(processed_upto) = shard_processed_upto.get(&shard_id) { - min_processed_upto = match min_processed_upto { - Some(current_min) => Some(current_min.min(*processed_upto)), - None => Some(*processed_upto), - }; + // Iterate through shards to find the minimum processed value from shards + for shard_processed_to in mc_data.shards_processed_to.values() { + if let Some(value) = shard_processed_to.get(&shard_id) { + min_processed_to = min_processed_to.map(|current_min| current_min.min(*value)); } } + // Combine with current and masterchain values - min_processed_upto = [current_processed_upto, min_processed_upto] + min_processed_to = [current_processed_to, min_processed_to] .into_iter() .flatten() .min(); } else { // Iterate through shards for non-masterchain - for (iter_shard_ident, shard_processed_upto) in &mc_data.shards_processed_upto { - let processed_upto = if iter_shard_ident == &shard_id { - current_processed_upto + // replace for current shard processed upto from current collation + for (iter_shard_ident, shard_processed_to) in &mc_data.shards_processed_to { + let shard_processed_to = if iter_shard_ident == &shard_id { + current_processed_to } else { - shard_processed_upto.get(&shard_id).cloned() + shard_processed_to.get(&shard_id).cloned() }; - if let Some(processed_upto) = processed_upto { - min_processed_upto = match min_processed_upto { - Some(current_min) => Some(current_min.min(processed_upto)), - None => Some(processed_upto), + // find minimum processed upto value + if let Some(value) = shard_processed_to { + min_processed_to = match min_processed_to { + Some(current_min) => Some(current_min.min(value)), + None => Some(value), }; } } // Combine with masterchain processed value - min_processed_upto = [ - min_processed_upto, - mc_processed_upto.map(|mc| mc.processed_to_msg), + min_processed_to = [ + min_processed_to, + mc_processed_to.map(|mc| mc.processed_to_msg), ] .into_iter() .flatten() .min(); } - if let Some(min_processed_upto) = min_processed_upto { - mq_adapter.trim_diffs(&shard_id, &min_processed_upto)?; + if let Some(value) = min_processed_to { + mq_adapter.trim_diffs(&shard_id, &value)?; }; - let diff_tail_len = mq_adapter.get_diff_count_by_shard(&shard_id) as u32; + let diff_tail_len = mq_adapter.get_diff_count_by_shard(&shard_id) as u32 + 1; let span = tracing::Span::current(); let (finalize_phase_result, update_queue_task_result) = rayon::join( @@ -900,7 +898,7 @@ impl CollatorStdImpl { proof_funds, #[cfg(feature = "block-creator-stats")] creators, - processed_upto, + processed_to, } = top_block_descr; let mut new_shard_descr = Box::new(ShardDescription::from_block_info( @@ -960,7 +958,7 @@ impl CollatorStdImpl { let top_shard_block_info = TopShardBlockInfo { block_id, - processed_upto, + processed_to, }; collation_data_builder diff --git a/collator/src/collator/tests/collator_tests.rs b/collator/src/collator/tests/collator_tests.rs index 5e9b8b1f5..8e06d5b49 100644 --- a/collator/src/collator/tests/collator_tests.rs +++ b/collator/src/collator/tests/collator_tests.rs @@ -316,7 +316,7 @@ fn test_get_anchors_processing_info() { consensus_info: Default::default(), top_processed_to_anchor: 0, ref_mc_state_handle: tracker.insert(0), - shards_processed_upto: Default::default(), + shards_processed_to: Default::default(), }; //------ diff --git a/collator/src/collator/tests/execution_manager_tests.rs b/collator/src/collator/tests/execution_manager_tests.rs index c8b6c1dc2..ff9e17e4f 100644 --- a/collator/src/collator/tests/execution_manager_tests.rs +++ b/collator/src/collator/tests/execution_manager_tests.rs @@ -238,7 +238,7 @@ fn gen_stub_working_state( }, top_processed_to_anchor: 0, ref_mc_state_handle: prev_shard_data.ref_mc_state_handle().clone(), - shards_processed_upto: Default::default(), + shards_processed_to: Default::default(), }), collation_config: Arc::new(Default::default()), wu_used_from_last_anchor: 0, diff --git a/collator/src/collator/types.rs b/collator/src/collator/types.rs index 97a9814b4..196750acf 100644 --- a/collator/src/collator/types.rs +++ b/collator/src/collator/types.rs @@ -1279,7 +1279,7 @@ pub struct UpdateQueueDiffResult { pub has_unprocessed_messages: bool, pub diff_messages_len: usize, pub create_queue_diff_elapsed: Duration, - pub processed_upto: BTreeMap, + pub processed_to: BTreeMap, } pub struct FinalizedCollationResult { diff --git a/collator/src/internal_queue/iterator.rs b/collator/src/internal_queue/iterator.rs index 0fb5d9856..b337e5a37 100644 --- a/collator/src/internal_queue/iterator.rs +++ b/collator/src/internal_queue/iterator.rs @@ -130,7 +130,7 @@ impl QueueIterator for QueueIteratorImpl { // fill processed_upto for (shard_id, message_key) in self.last_processed_message.iter() { // TODO: may be `diff.processed_upto` should be a HashMap and we can consume it from iterator - diff.processed_upto.insert(*shard_id, *message_key); + diff.processed_to.insert(*shard_id, *message_key); } // move new messages @@ -162,7 +162,7 @@ impl QueueIterator for QueueIteratorImpl { for (shard_id, message_key) in self.last_processed_message.iter() { // TODO: may be `diff.processed_upto` should be a HashMap and we can consume it from iterator - diff.processed_upto.insert(*shard_id, *message_key); + diff.processed_to.insert(*shard_id, *message_key); } diff.messages = self.new_messages.clone(); diff --git a/collator/src/internal_queue/queue.rs b/collator/src/internal_queue/queue.rs index 9134826a6..e6672399f 100644 --- a/collator/src/internal_queue/queue.rs +++ b/collator/src/internal_queue/queue.rs @@ -118,7 +118,7 @@ impl QueueFactory for QueueFactoryStdImpl { } struct ShortQueueDiff { - pub processed_upto: BTreeMap, + pub processed_to: BTreeMap, pub end_key: QueueKey, pub hash: HashBytes, } @@ -131,7 +131,6 @@ where { session_state: Arc, persistent_state: Arc

, - // diffs: FastDashMap>, session_diffs: FastDashMap>, persistent_diffs: FastDashMap>, gc: GcManager, @@ -206,7 +205,7 @@ where } let short_diff = ShortQueueDiff { - processed_upto: diff.processed_upto, + processed_to: diff.processed_to, end_key, hash: *hash, }; @@ -238,15 +237,15 @@ where *current_last_key = shard_diff.end_key; } - // find min processed_upto for each shard for GC + // find min processed_to for each shard for GC if *block_seqno == block_id_short.seqno && *top_shard_block_changed { - for processed_upto in shard_diff.processed_upto.iter() { + for (shard_ident, processed_to_key) in shard_diff.processed_to.iter() { let last_key = gc_ranges - .entry(*processed_upto.0) - .or_insert_with(|| *processed_upto.1); + .entry(*shard_ident) + .or_insert_with(|| *processed_to_key); - if processed_upto.1 < last_key { - *last_key = *processed_upto.1; + if processed_to_key < last_key { + *last_key = *processed_to_key; } } } @@ -295,6 +294,7 @@ where .persistent_diffs .get(shard_ident) .map_or(0, |diffs| diffs.len()); + session_count + persistent_count } diff --git a/collator/src/internal_queue/state/session_state.rs b/collator/src/internal_queue/state/session_state.rs index 35257b131..3bf162c2d 100644 --- a/collator/src/internal_queue/state/session_state.rs +++ b/collator/src/internal_queue/state/session_state.rs @@ -139,10 +139,7 @@ impl SessionState for SessionStateStdImpl { } fn commit_messages(&self, ranges: &FastHashMap) -> Result<()> { - let ranges = ranges - .iter() - .map(|(shard, key)| (*shard, *key)) - .collect::>(); + let ranges = ranges.iter().map(|(shard, key)| (*shard, *key)).collect(); self.storage.internal_queue_storage().commit(ranges) } diff --git a/collator/src/internal_queue/types.rs b/collator/src/internal_queue/types.rs index 2b2214722..c0a98d306 100644 --- a/collator/src/internal_queue/types.rs +++ b/collator/src/internal_queue/types.rs @@ -13,14 +13,14 @@ use super::state::state_iterator::MessageExt; #[derive(Default, Debug, Clone)] pub struct QueueDiffWithMessages { pub messages: BTreeMap>, - pub processed_upto: BTreeMap, + pub processed_to: BTreeMap, } impl QueueDiffWithMessages { pub fn new() -> Self { Self { messages: BTreeMap::new(), - processed_upto: BTreeMap::new(), + processed_to: BTreeMap::new(), } } } @@ -31,7 +31,7 @@ impl QueueDiffWithMessages { out_msg_description: &OutMsgDescr, ) -> Result { let QueueDiff { processed_upto, .. } = queue_diff_stuff.as_ref(); - let processed_upto: BTreeMap = processed_upto + let processed_to: BTreeMap = processed_upto .iter() .map(|(shard_ident, key)| (*shard_ident, *key)) .collect(); @@ -51,7 +51,7 @@ impl QueueDiffWithMessages { Ok(Self { messages, - processed_upto, + processed_to, }) } } diff --git a/collator/src/manager/blocks_cache.rs b/collator/src/manager/blocks_cache.rs index 0a620e47e..bf7c91798 100644 --- a/collator/src/manager/blocks_cache.rs +++ b/collator/src/manager/blocks_cache.rs @@ -44,11 +44,11 @@ impl BlocksCache { for mut shard_cache in self.shards.iter_mut() { for (_, entry) in shard_cache.blocks.iter().rev() { if entry.ref_by_mc_seqno == next_mc_block_id_short.seqno { - let processed_upto = entry + let processed_to = entry .int_processed_to() .iter() .map(|(shard, queue_key)| (*shard, *queue_key)) - .collect::>(); + .collect(); if let Some(additional_info) = entry.data.get_additional_shard_block_cache_info()? @@ -61,7 +61,7 @@ impl BlocksCache { proof_funds: std::mem::take(&mut shard_cache.data.proof_funds), #[cfg(feature = "block-creator-stats")] creators: std::mem::take(&mut shard_cache.data.creators), - processed_upto, + processed_to, }); break; } @@ -156,14 +156,11 @@ impl BlocksCache { ) }; - let mut processed_to = FastHashMap::default(); - - mc_block_entry + let processed_to = mc_block_entry .int_processed_to() .iter() - .for_each(|(shard, queue_key)| { - processed_to.insert(*shard, *queue_key); - }); + .map(|(shard, queue_key)| (*shard, *queue_key)) + .collect(); updated_top_shard_block_ids = mc_block_entry .top_shard_blocks_info diff --git a/collator/src/manager/mod.rs b/collator/src/manager/mod.rs index ae239f08e..4602f7a82 100644 --- a/collator/src/manager/mod.rs +++ b/collator/src/manager/mod.rs @@ -1415,21 +1415,16 @@ where Some(processed_to) => processed_to, None => { // try get from storage - let loaded = utils::load_only_queue_diff_stuff( + utils::load_only_queue_diff_stuff( self.state_node_adapter.as_ref(), &top_block_id, ) - .await?; - - let loaded = loaded.as_ref().processed_upto.clone(); - - let mut processed_to = FastHashMap::default(); - - for (shard_id, to_key) in loaded { - processed_to.insert(shard_id, to_key); - } - - processed_to + .await? + .as_ref() + .processed_upto + .clone() + .into_iter() + .collect() } }; diff --git a/collator/src/queue_adapter.rs b/collator/src/queue_adapter.rs index 18e7dad03..ec8dc181e 100644 --- a/collator/src/queue_adapter.rs +++ b/collator/src/queue_adapter.rs @@ -109,7 +109,7 @@ impl MessageQueueAdapter for MessageQueueAdapterStdI ) -> Result<()> { let time = std::time::Instant::now(); let len = diff.messages.len(); - let processed_upto = diff.processed_upto.clone(); + let processed_upto = diff.processed_to.clone(); self.queue.apply_diff(diff, block_id_short, hash, end_key)?; tracing::info!(target: tracing_targets::MQ_ADAPTER, diff --git a/collator/src/types.rs b/collator/src/types.rs index 85e5ac3b2..0a3363ed9 100644 --- a/collator/src/types.rs +++ b/collator/src/types.rs @@ -215,7 +215,7 @@ pub struct McData { pub ref_mc_state_handle: RefMcStateHandle, - pub shards_processed_upto: FastHashMap>, + pub shards_processed_to: FastHashMap>, } impl McData { @@ -263,7 +263,7 @@ impl McData { top_processed_to_anchor, ref_mc_state_handle: state_stuff.ref_mc_state_handle().clone(), - shards_processed_upto, + shards_processed_to: shards_processed_upto, })) } @@ -296,7 +296,7 @@ pub struct BlockCandidate { pub created_by: HashBytes, pub queue_diff_aug: QueueDiffStuffAug, pub consensus_info: ConsensusInfo, - pub processed_upto: FastHashMap, + pub processed_to: FastHashMap, } #[derive(Default, Clone)] @@ -438,7 +438,7 @@ pub struct TopBlockDescription { pub proof_funds: ProofFunds, #[cfg(feature = "block-creator-stats")] pub creators: Vec, - pub processed_upto: FastHashMap, + pub processed_to: FastHashMap, } #[derive(Debug)] @@ -680,5 +680,5 @@ where #[derive(Debug, Clone)] pub struct TopShardBlockInfo { pub block_id: BlockId, - pub processed_upto: FastHashMap, + pub processed_to: FastHashMap, } diff --git a/collator/tests/internal_queue.rs b/collator/tests/internal_queue.rs index f0a0980b0..e789ab869 100644 --- a/collator/tests/internal_queue.rs +++ b/collator/tests/internal_queue.rs @@ -433,7 +433,7 @@ fn test_queue_diff_with_messages_from_queue_diff_stuff() -> anyhow::Result<()> { let diff_with_messages = QueueDiffWithMessages::from_queue_diff(&queue_diff_stuff, &out_msg)?; - assert_eq!(diff_with_messages.processed_upto, diff.processed_upto,); + assert_eq!(diff_with_messages.processed_to, diff.processed_upto,); assert_eq!( diff_with_messages