diff --git a/chain/network/src/config.rs b/chain/network/src/config.rs index 354943b60ae..4de3b4df690 100644 --- a/chain/network/src/config.rs +++ b/chain/network/src/config.rs @@ -5,6 +5,7 @@ use crate::network_protocol::PeerInfo; use crate::peer_manager::peer_manager_actor::Event; use crate::peer_manager::peer_store; use crate::sink::Sink; +use crate::snapshot_hosts; use crate::stun; use crate::tcp; use crate::types::ROUTED_MESSAGE_TTL; @@ -96,6 +97,7 @@ pub struct NetworkConfig { pub validator: Option, pub peer_store: peer_store::Config, + pub snapshot_hosts: snapshot_hosts::Config, pub whitelist_nodes: Vec, pub handshake_timeout: time::Duration, @@ -285,6 +287,9 @@ impl NetworkConfig { ban_window: cfg.ban_window.try_into()?, peer_expiration_duration: cfg.peer_expiration_duration.try_into()?, }, + snapshot_hosts: snapshot_hosts::Config { + snapshot_hosts_cache_size: cfg.snapshot_hosts_cache_size, + }, whitelist_nodes: if cfg.whitelist_nodes.is_empty() { vec![] } else { @@ -367,6 +372,7 @@ impl NetworkConfig { peer_expiration_duration: time::Duration::seconds(60 * 60), connect_only_to_boot_nodes: false, }, + snapshot_hosts: snapshot_hosts::Config { snapshot_hosts_cache_size: 1000 }, whitelist_nodes: vec![], handshake_timeout: time::Duration::seconds(5), connect_to_reliable_peers_on_startup: true, diff --git a/chain/network/src/config_json.rs b/chain/network/src/config_json.rs index 4eeb4b8d117..a1c10453da0 100644 --- a/chain/network/src/config_json.rs +++ b/chain/network/src/config_json.rs @@ -51,6 +51,10 @@ fn default_monitor_peers_max_period() -> Duration { fn default_peer_states_cache_size() -> u32 { 1000 } +/// Maximum number of snapshot hosts to keep in memory. +fn default_snapshot_hosts_cache_size() -> u32 { + 1000 +} /// Remove peers that we didn't hear about for this amount of time. fn default_peer_expiration_duration() -> Duration { Duration::from_secs(7 * 24 * 60 * 60) @@ -139,6 +143,9 @@ pub struct Config { /// Maximum number of peer states to keep in memory. #[serde(default = "default_peer_states_cache_size")] pub peer_states_cache_size: u32, + /// Maximum number of snapshot hosts to keep in memory. + #[serde(default = "default_snapshot_hosts_cache_size")] + pub snapshot_hosts_cache_size: u32, // Remove peers that were not active for this amount of time. #[serde(default = "default_peer_expiration_duration")] pub peer_expiration_duration: Duration, @@ -296,6 +303,7 @@ impl Default for Config { handshake_timeout: Duration::from_secs(20), skip_sync_wait: false, peer_states_cache_size: default_peer_states_cache_size(), + snapshot_hosts_cache_size: default_snapshot_hosts_cache_size(), ban_window: Duration::from_secs(3 * 60 * 60), blacklist: vec![], ttl_account_id_router: default_ttl_account_id_router(), diff --git a/chain/network/src/peer_manager/network_state/mod.rs b/chain/network/src/peer_manager/network_state/mod.rs index 6ee4e208afd..2d194ead212 100644 --- a/chain/network/src/peer_manager/network_state/mod.rs +++ b/chain/network/src/peer_manager/network_state/mod.rs @@ -189,7 +189,7 @@ impl NetworkState { tier1: connection::Pool::new(config.node_id()), inbound_handshake_permits: Arc::new(tokio::sync::Semaphore::new(LIMIT_PENDING_PEERS)), peer_store, - snapshot_hosts: Arc::new(SnapshotHostsCache::new()), + snapshot_hosts: Arc::new(SnapshotHostsCache::new(config.snapshot_hosts.clone())), connection_store: connection_store::ConnectionStore::new(store).unwrap(), pending_reconnect: Mutex::new(Vec::::new()), accounts_data: Arc::new(AccountDataCache::new()), diff --git a/chain/network/src/snapshot_hosts/mod.rs b/chain/network/src/snapshot_hosts/mod.rs index 97b3ac76fa7..82e710f15f3 100644 --- a/chain/network/src/snapshot_hosts/mod.rs +++ b/chain/network/src/snapshot_hosts/mod.rs @@ -5,9 +5,10 @@ //! in the network and stored locally in this cache. use crate::concurrency; -use crate::concurrency::arc_mutex::ArcMutex; use crate::network_protocol::SnapshotHostInfo; +use lru::LruCache; use near_primitives::network::PeerId; +use parking_lot::Mutex; use rayon::iter::ParallelBridge; use std::collections::HashMap; use std::sync::Arc; @@ -23,16 +24,23 @@ pub(crate) enum SnapshotHostInfoError { DuplicatePeerId, } -/// TODO(saketh): Introduce a cache size limit #[derive(Clone)] +pub struct Config { + /// The maximum number of SnapshotHostInfos to store locally. + /// At present this constraint is enforced using a simple + /// least-recently-used cache. In the future, we may wish to + /// implement something more sophisticated. + pub snapshot_hosts_cache_size: u32, +} + struct Inner { /// The latest known SnapshotHostInfo for each node in the network - hosts: im::HashMap>, + hosts: LruCache>, } impl Inner { fn is_new(&self, h: &SnapshotHostInfo) -> bool { - match self.hosts.get(&h.peer_id) { + match self.hosts.peek(&h.peer_id) { Some(old) if old.epoch_height >= h.epoch_height => false, _ => true, } @@ -45,16 +53,17 @@ impl Inner { if !self.is_new(&d) { return None; } - self.hosts.insert(d.peer_id.clone(), d.clone()); + self.hosts.push(d.peer_id.clone(), d.clone()); Some(d) } } -pub(crate) struct SnapshotHostsCache(ArcMutex); +pub(crate) struct SnapshotHostsCache(Mutex); impl SnapshotHostsCache { - pub fn new() -> Self { - Self(ArcMutex::new(Inner { hosts: im::HashMap::new() })) + pub fn new(config: Config) -> Self { + let hosts = LruCache::new(config.snapshot_hosts_cache_size as usize); + Self(Mutex::new(Inner { hosts })) } /// Selects new data and verifies the signatures. @@ -66,17 +75,19 @@ impl SnapshotHostsCache { ) -> (Vec>, Option) { // Filter out any data which is outdated or which we already have. let mut new_data = HashMap::new(); - let inner = self.0.load(); - for d in data { - // Sharing multiple entries for the same peer is considered malicious, - // since all but one are obviously outdated. - if new_data.contains_key(&d.peer_id) { - return (vec![], Some(SnapshotHostInfoError::DuplicatePeerId)); - } - // It is fine to broadcast data we already know about. - // It is fine to broadcast data which we know to be outdated. - if inner.is_new(&d) { - new_data.insert(d.peer_id.clone(), d); + { + let inner = self.0.lock(); + for d in data { + // Sharing multiple entries for the same peer is considered malicious, + // since all but one are obviously outdated. + if new_data.contains_key(&d.peer_id) { + return (vec![], Some(SnapshotHostInfoError::DuplicatePeerId)); + } + // It is fine to broadcast data we already know about. + // It is fine to broadcast data which we know to be outdated. + if inner.is_new(&d) { + new_data.insert(d.peer_id.clone(), d); + } } } @@ -99,22 +110,24 @@ impl SnapshotHostsCache { /// Returns the data inserted and optionally a verification error. /// WriteLock is acquired only for the final update (after verification). pub async fn insert( - self: &Arc, + self: &Self, data: Vec>, ) -> (Vec>, Option) { - let this = self.clone(); // Execute verification on the rayon threadpool. - let (data, err) = this.verify(data).await; + let (data, err) = self.verify(data).await; // Insert the successfully verified data, even if an error has been encountered. - let inserted = self.0.update(|mut inner| { - let inserted = data.into_iter().filter_map(|d| inner.try_insert(d)).collect(); - (inserted, inner) - }); + let mut newly_inserted_data: Vec> = vec![]; + let mut inner = self.0.lock(); + for d in data { + if let Some(inserted) = inner.try_insert(d) { + newly_inserted_data.push(inserted); + } + } // Return the inserted data. - (inserted, err) + (newly_inserted_data, err) } pub fn get_hosts(&self) -> Vec> { - self.0.load().hosts.values().cloned().collect() + self.0.lock().hosts.iter().map(|(_, v)| v.clone()).collect() } } diff --git a/chain/network/src/snapshot_hosts/tests.rs b/chain/network/src/snapshot_hosts/tests.rs index d0f64f6face..a36e859e47e 100644 --- a/chain/network/src/snapshot_hosts/tests.rs +++ b/chain/network/src/snapshot_hosts/tests.rs @@ -1,5 +1,5 @@ use crate::network_protocol::testonly as data; -use crate::snapshot_hosts::{SnapshotHostInfoError, SnapshotHostsCache}; +use crate::snapshot_hosts::{Config, SnapshotHostInfoError, SnapshotHostsCache}; use crate::testonly::assert_is_superset; use crate::testonly::{make_rng, AsSet as _}; use crate::types::SnapshotHostInfo; @@ -45,7 +45,8 @@ async fn happy_path() { let peer1 = PeerId::new(key1.public_key()); let peer2 = PeerId::new(key2.public_key()); - let cache = Arc::new(SnapshotHostsCache::new()); + let config = Config { snapshot_hosts_cache_size: 100 }; + let cache = SnapshotHostsCache::new(config); assert_eq!(cache.get_hosts().len(), 0); // initially empty // initial insert @@ -79,7 +80,9 @@ async fn invalid_signature() { let peer0 = PeerId::new(key0.public_key()); let peer1 = PeerId::new(key1.public_key()); - let cache = Arc::new(SnapshotHostsCache::new()); + let config = Config { snapshot_hosts_cache_size: 100 }; + let cache = SnapshotHostsCache::new(config); + let info0_invalid_sig = Arc::new(make_snapshot_host_info(&peer0, 1, vec![0, 1, 2, 3], &key1)); let info1 = Arc::new(make_snapshot_host_info(&peer1, 1, vec![0, 1, 2, 3], &key1)); let res = cache.insert(vec![info0_invalid_sig.clone(), info1.clone()]).await; @@ -102,7 +105,8 @@ async fn duplicate_peer_id() { let key0 = data::make_secret_key(rng); let peer0 = PeerId::new(key0.public_key()); - let cache = Arc::new(SnapshotHostsCache::new()); + let config = Config { snapshot_hosts_cache_size: 100 }; + let cache = SnapshotHostsCache::new(config); let info00 = Arc::new(make_snapshot_host_info(&peer0, 1, vec![0, 1, 2, 3], &key0)); let info01 = Arc::new(make_snapshot_host_info(&peer0, 2, vec![0, 3], &key0)); @@ -113,3 +117,40 @@ async fn duplicate_peer_id() { // no partial data is stored assert_eq!(0, cache.get_hosts().len()); } + +#[tokio::test] +async fn test_lru_eviction() { + init_test_logger(); + let mut rng = make_rng(2947294234); + let rng = &mut rng; + + let key0 = data::make_secret_key(rng); + let key1 = data::make_secret_key(rng); + let key2 = data::make_secret_key(rng); + + let peer0 = PeerId::new(key0.public_key()); + let peer1 = PeerId::new(key1.public_key()); + let peer2 = PeerId::new(key2.public_key()); + + let config = Config { snapshot_hosts_cache_size: 2 }; + let cache = SnapshotHostsCache::new(config); + + // initial inserts to capacity + let info0 = Arc::new(make_snapshot_host_info(&peer0, 123, vec![0, 1, 2, 3], &key0)); + let res = cache.insert(vec![info0.clone()]).await; + assert_eq!([&info0].as_set(), unwrap(&res).as_set()); + assert_eq!([&info0].as_set(), cache.get_hosts().iter().collect::>()); + + let info1 = Arc::new(make_snapshot_host_info(&peer1, 123, vec![2], &key1)); + let res = cache.insert(vec![info1.clone()]).await; + assert_eq!([&info1].as_set(), unwrap(&res).as_set()); + assert_eq!([&info0, &info1].as_set(), cache.get_hosts().iter().collect::>()); + + // insert past capacity + let info2 = Arc::new(make_snapshot_host_info(&peer2, 123, vec![1, 3], &key2)); + let res = cache.insert(vec![info2.clone()]).await; + // check that the new data is accepted + assert_eq!([&info2].as_set(), unwrap(&res).as_set()); + // check that the oldest data was evicted + assert_eq!([&info1, &info2].as_set(), cache.get_hosts().iter().collect::>()); +}