From 7d54a43243905b62e1ced8f56cd5ad0575b8638b Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 30 Jan 2025 04:01:32 -0300 Subject: [PATCH] Make range sync chain Id sequential (#6868) Currently, we set the `chain_id` of range sync chains to `u64(hash(target_root, target_slot))`, which results in a long integer. ``` Jan 27 00:43:27.246 DEBG Batch downloaded, chain: 4223372036854775807, awaiting_batches: 0, batch_state: [p,E,E,E,E], blocks: 0, epoch: 0, service: range_sync ``` Instead, we can use `network_context.next_id()` as we do for all other sync items and get a unique sequential (not too big) integer as id. ``` Jan 27 00:43:27.246 DEBG Batch downloaded, chain: 4, awaiting_batches: 0, batch_state: [p,E,E,E,E], blocks: 0, epoch: 0, service: range_sync ``` Also, if a specific chain for the same target is retried later, it won't get the same ID so we can more clearly differentiate the logs associated with each attempt. --- .../network/src/sync/range_sync/chain.rs | 19 +++++------- .../src/sync/range_sync/chain_collection.rs | 31 +++++++++++-------- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 51d9d9da37f..4eb73f54839 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -15,7 +15,6 @@ use rand::seq::SliceRandom; use rand::Rng; use slog::{crit, debug, o, warn}; use std::collections::{btree_map::Entry, BTreeMap, HashSet}; -use std::hash::{Hash, Hasher}; use strum::IntoStaticStr; use types::{Epoch, EthSpec, Hash256, Slot}; @@ -56,7 +55,7 @@ pub enum RemoveChain { pub struct KeepChain; /// A chain identifier -pub type ChainId = u64; +pub type ChainId = Id; pub type BatchId = Epoch; #[derive(Debug, Copy, Clone, IntoStaticStr)] @@ -127,14 +126,9 @@ pub enum ChainSyncingState { } impl SyncingChain { - pub fn id(target_root: &Hash256, target_slot: &Slot) -> u64 { - let mut hasher = std::collections::hash_map::DefaultHasher::new(); - (target_root, target_slot).hash(&mut hasher); - hasher.finish() - } - #[allow(clippy::too_many_arguments)] pub fn new( + id: Id, start_epoch: Epoch, target_head_slot: Slot, target_head_root: Hash256, @@ -145,8 +139,6 @@ impl SyncingChain { let mut peers = FnvHashMap::default(); peers.insert(peer_id, Default::default()); - let id = SyncingChain::::id(&target_head_root, &target_head_slot); - SyncingChain { id, chain_type, @@ -165,6 +157,11 @@ impl SyncingChain { } } + /// Returns true if this chain has the same target + pub fn has_same_target(&self, target_head_slot: Slot, target_head_root: Hash256) -> bool { + self.target_head_slot == target_head_slot && self.target_head_root == target_head_root + } + /// Check if the chain has peers from which to process batches. pub fn available_peers(&self) -> usize { self.peers.len() @@ -1258,7 +1255,7 @@ impl slog::KV for SyncingChain { serializer: &mut dyn slog::Serializer, ) -> slog::Result { use slog::Value; - serializer.emit_u64("id", self.id)?; + serializer.emit_u32("id", self.id)?; Value::serialize(&self.start_epoch, record, "from", serializer)?; Value::serialize( &self.target_head_slot.epoch(T::EthSpec::slots_per_epoch()), diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 16dadb36602..15bdf85e203 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -9,6 +9,7 @@ use crate::metrics; use crate::sync::network_context::SyncNetworkContext; use beacon_chain::{BeaconChain, BeaconChainTypes}; use fnv::FnvHashMap; +use lighthouse_network::service::api_types::Id; use lighthouse_network::PeerId; use lighthouse_network::SyncInfo; use slog::{crit, debug, error}; @@ -29,9 +30,9 @@ const MIN_FINALIZED_CHAIN_PROCESSED_EPOCHS: u64 = 10; #[derive(Clone)] pub enum RangeSyncState { /// A finalized chain is being synced. - Finalized(u64), + Finalized(Id), /// There are no finalized chains and we are syncing one more head chains. - Head(SmallVec<[u64; PARALLEL_HEAD_CHAINS]>), + Head(SmallVec<[Id; PARALLEL_HEAD_CHAINS]>), /// There are no head or finalized chains and no long range sync is in progress. Idle, } @@ -74,7 +75,7 @@ impl ChainCollection { if syncing_id == id { // the finalized chain that was syncing was removed debug_assert!(was_syncing && sync_type == RangeSyncType::Finalized); - let syncing_head_ids: SmallVec<[u64; PARALLEL_HEAD_CHAINS]> = self + let syncing_head_ids: SmallVec<[Id; PARALLEL_HEAD_CHAINS]> = self .head_chains .iter() .filter(|(_id, chain)| chain.is_syncing()) @@ -355,7 +356,7 @@ impl ChainCollection { .collect::>(); preferred_ids.sort_unstable(); - let mut syncing_chains = SmallVec::<[u64; PARALLEL_HEAD_CHAINS]>::new(); + let mut syncing_chains = SmallVec::<[Id; PARALLEL_HEAD_CHAINS]>::new(); for (_, _, id) in preferred_ids { let chain = self.head_chains.get_mut(&id).expect("known chain"); if syncing_chains.len() < PARALLEL_HEAD_CHAINS { @@ -465,15 +466,17 @@ impl ChainCollection { sync_type: RangeSyncType, network: &mut SyncNetworkContext, ) { - let id = SyncingChain::::id(&target_head_root, &target_head_slot); let collection = if let RangeSyncType::Finalized = sync_type { &mut self.finalized_chains } else { &mut self.head_chains }; - match collection.entry(id) { - Entry::Occupied(mut entry) => { - let chain = entry.get_mut(); + + match collection + .iter_mut() + .find(|(_, chain)| chain.has_same_target(target_head_slot, target_head_root)) + { + Some((&id, chain)) => { debug!(self.log, "Adding peer to known chain"; "peer_id" => %peer, "sync_type" => ?sync_type, &chain); debug_assert_eq!(chain.target_head_root, target_head_root); debug_assert_eq!(chain.target_head_slot, target_head_slot); @@ -483,13 +486,16 @@ impl ChainCollection { } else { error!(self.log, "Chain removed after adding peer"; "chain" => id, "reason" => ?remove_reason); } - let chain = entry.remove(); - self.on_chain_removed(&id, chain.is_syncing(), sync_type); + let is_syncing = chain.is_syncing(); + collection.remove(&id); + self.on_chain_removed(&id, is_syncing, sync_type); } } - Entry::Vacant(entry) => { + None => { let peer_rpr = peer.to_string(); + let id = network.next_id(); let new_chain = SyncingChain::new( + id, start_epoch, target_head_slot, target_head_root, @@ -497,9 +503,8 @@ impl ChainCollection { sync_type.into(), &self.log, ); - debug_assert_eq!(new_chain.get_id(), id); debug!(self.log, "New chain added to sync"; "peer_id" => peer_rpr, "sync_type" => ?sync_type, &new_chain); - entry.insert(new_chain); + collection.insert(id, new_chain); metrics::inc_counter_vec(&metrics::SYNCING_CHAINS_ADDED, &[sync_type.as_str()]); self.update_metrics(); }