From abf164d46155a8fa72bdb4af3e8fd826d29d60b1 Mon Sep 17 00:00:00 2001 From: baowj Date: Wed, 22 Jan 2025 22:08:56 +0800 Subject: [PATCH] fix select_parent error & optimize codes. Signed-off-by: baowj --- .../src/resource/parent_selector.rs | 138 ++++++++---------- 1 file changed, 62 insertions(+), 76 deletions(-) diff --git a/dragonfly-client/src/resource/parent_selector.rs b/dragonfly-client/src/resource/parent_selector.rs index 749fba2d..75add305 100644 --- a/dragonfly-client/src/resource/parent_selector.rs +++ b/dragonfly-client/src/resource/parent_selector.rs @@ -34,26 +34,15 @@ use std::num::NonZeroUsize; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio_stream::StreamExt; -use tracing::{error, info, instrument, Instrument}; +use tracing::{debug, error, info, instrument, Instrument}; use validator::HasLen; #[allow(dead_code)] -const DEFAULT_AVAILABLE_CAPACITY: f64 = ByteSize::gb(10).as_u64() as f64; +const DEFAULT_AVAILABLE_CAPACITY: f64 = ByteSize::mb(10000 / 8).as_u64() as f64; #[allow(dead_code)] const DEFAULT_SYNC_HOST_TIMEOUT: u32 = 5; -/// Parent is used to control sync host thread. -#[derive(Clone)] -#[allow(dead_code)] -pub struct Parent { - /// parent is the CollectedParent to sync host. - parent: CollectedParent, - - /// shutdown is used to stop sync host thread. - shutdown: Shutdown, -} - /// TaskParentSelector is used to store data to select parents for specific task. #[derive(Clone)] #[allow(dead_code)] @@ -67,7 +56,7 @@ pub struct TaskParentSelector { /// probability is the selection probability of different parents. probability: Vec, - /// last_sync_time records the latest time for refreshing probability. + /// last_sync_time records the latest time for refreshing probability lazily. last_sync_time: SystemTime, /// sync_interval represents the time interval between two refreshing probability operations. @@ -89,7 +78,7 @@ impl TaskParentSelector { let mut parent_list: Vec = Vec::new(); let mut probability: Vec = Vec::new(); - let _ = collected_parents.iter().map(|parent| { + collected_parents.iter().for_each(|parent| { parents.insert(parent.id.clone(), Host::default()); parent_list.push(parent.id.clone()); probability.push(1f64 / parents.len() as f64); @@ -118,10 +107,10 @@ impl TaskParentSelector { // Lazy refresh probability. if now_time.duration_since(self.last_sync_time).unwrap() > self.sync_interval { let mut parent_available_capacity = Vec::with_capacity(self.probability.len()); - let mut count = 0; - let mut sum = 0f64; + let mut sum: f64 = 0f64; let parent_map = self.parents.clone(); + // Update parent host available capacity. self.parent_list .iter() .for_each(|parent_id| match parent_map.get(parent_id) { @@ -132,32 +121,26 @@ impl TaskParentSelector { Ok(capacity) => { parent_available_capacity.push(capacity); sum += capacity; - count += 1; } Err(_) => { parent_available_capacity.push(0f64); } }, }); - // Calc average available capacity. - let mut avg = DEFAULT_AVAILABLE_CAPACITY; - if count != 0 { - avg = sum / count as f64; - } - // Calc sum. - sum += avg * (parent_available_capacity.len() - count) as f64; - - // Prevent division by 0 - sum += 0.1f64; + let count = self.parent_list.len(); // Update probability. - let _ = self.probability.iter_mut().enumerate().map(|(idx, p)| { - if parent_available_capacity[idx] == 0f64 { - *p = avg / sum; - } else { - *p = parent_available_capacity[idx] / sum; - } - }); + self.probability + .iter_mut() + .enumerate() + .for_each(|(idx, p)| { + if sum > 0f64 { + *p = parent_available_capacity[idx] / sum; + } else { + *p = 1f64 / count as f64; + } + }); + debug!("update probability to {:?}", self.probability); // Reset last_sync_time. self.last_sync_time = now_time; } @@ -177,7 +160,10 @@ impl TaskParentSelector { /// available_capacity return the available capacity of the host. fn available_capacity(host: Host) -> Result { - Ok(host.network.unwrap().upload_rate as f64) + match host.network { + None => Ok(DEFAULT_AVAILABLE_CAPACITY), + Some(network) => Ok(network.upload_rate as f64), + } } } @@ -191,10 +177,10 @@ pub struct ParentSelector { sync_interval: Duration, /// tasks is the collector for all parent selection tasks. - tasks: DashMap, + selectors: Arc>, - /// parent_cache is the lru cache to store sync host thread. - parent_cache: Arc>>, + /// cache is the lru cache to store sync host thread. + cache: Arc>>, /// id_generator is a IDGenerator. id_generator: Arc, @@ -208,7 +194,7 @@ impl ParentSelector { pub fn new(config: Arc, id_generator: Arc) -> ParentSelector { let config = config.clone(); let sync_interval = config.download.parent_selector.sync_interval; - let tasks = DashMap::new(); + let selectors = Arc::new(DashMap::new()); let parent_cache = LruCache::new( NonZeroUsize::try_from(config.download.parent_selector.capacity).unwrap(), ); @@ -217,8 +203,8 @@ impl ParentSelector { ParentSelector { config, sync_interval, - tasks, - parent_cache: Arc::new(Mutex::new(parent_cache)), + selectors, + cache: Arc::new(Mutex::new(parent_cache)), id_generator, } } @@ -226,53 +212,52 @@ impl ParentSelector { /// register_parents registers task and it's parents. #[instrument(skip_all)] pub fn register_parents(&self, task_id: String, add_parents: &Vec) { - // No parents. - if add_parents.length() == 0 { + // If not enable. + if !self.config.download.parent_selector.enable { return; } - // Get tasks - let tasks = self.tasks.clone(); - // Add task - let task = TaskParentSelector::new(add_parents.clone(), self.sync_interval); - tasks.insert(task_id, task); - - // If not enable. - if !self.config.download.parent_selector.enable { + // No parents, skip. + if add_parents.length() == 0 { + info!("register failed, parents length = 0"); return; } + // Get all task parent selectors. + let selectors = self.selectors.clone(); + + // Add task parent selector. + let selector = TaskParentSelector::new(add_parents.clone(), self.sync_interval); + selectors.insert(task_id, selector); // Get LRU cache. - let cache = self.parent_cache.clone(); + let cache = self.cache.clone(); let cache = cache.lock(); let config = self.config.clone(); if let Ok(mut cache) = cache { for parent in add_parents { + // already contains parent.id, move to head and skip. + if cache.get(&parent.id).is_some() { + continue; + } + + // Create shutdown to control thread. let shutdown = Shutdown::new(); - // Create Parent - let new_parent = Parent { - parent: parent.clone(), - shutdown: shutdown.clone(), - }; - if cache.len() == cache.cap().get() { - if let Some(element) = cache.pop_lru() { - // Stop popped thread. - element.1.shutdown.trigger(); - } + // Push new parent to the LRU cache. + if let Some(old_shutdown) = cache.put(parent.id.clone(), shutdown.clone()) { + // Shutdown popped sync_host thread. + old_shutdown.trigger(); } - cache.push(parent.id.clone(), new_parent); - // Start new thread. + // Start new sync_host thread. let config = config.clone(); let host_id = self.id_generator.host_id(); let peer_id = self.id_generator.peer_id(); let parent = parent.clone(); - let tasks = self.tasks.clone(); + let tasks = self.selectors.clone(); let shutdown = shutdown.clone(); - let sync_host_timeout = - config.download.parent_selector.sync_interval * DEFAULT_SYNC_HOST_TIMEOUT; + let sync_host_timeout = self.sync_interval * DEFAULT_SYNC_HOST_TIMEOUT; tokio::spawn( async move { let _ = Self::sync_host( @@ -295,8 +280,9 @@ impl ParentSelector { /// unregister_parents unregisters task. #[instrument(skip_all)] pub fn unregister_parents(&self, task_id: String) { - let tasks = self.tasks.clone(); - tasks.remove(&task_id); + let selectors = self.selectors.clone(); + // Remove this task parent selector. + selectors.remove(&task_id); } /// sync_host is a sub thread to sync host info from the parent. @@ -307,7 +293,7 @@ impl ParentSelector { host_id: String, peer_id: String, parent: CollectedParent, - tasks: DashMap, + selectors: Arc>, shutdown: Shutdown, sync_host_timeout: Duration, ) -> Result<()> { @@ -345,7 +331,7 @@ impl ParentSelector { tokio::pin!(out_stream); // Get tasks. - let tasks = tasks.clone(); + let selectors = selectors.clone(); while let Some(message) = out_stream.try_next().await.or_err(ErrorType::StreamError)? { // Check shutdown. if shutdown.is_shutdown() { @@ -354,9 +340,9 @@ impl ParentSelector { // Deal with massage. match message { Ok(message) => { - // Update the parent host information for all tasks associated with this parent. - tasks.iter_mut().for_each(|task| { - if let Some(mut parent_info) = task.parents.get_mut(&parent.id) { + // Update the parent's host info for all selectors associated with this parent. + selectors.iter_mut().for_each(|selector| { + if let Some(mut parent_info) = selector.parents.get_mut(&parent.id) { *parent_info = message.clone(); } }); @@ -374,7 +360,7 @@ impl ParentSelector { /// optimal_parent get optimal parent for the task. #[instrument(skip_all)] pub fn optimal_parent(&self, task_id: String) -> Result { - let tasks = self.tasks.clone(); + let tasks = self.selectors.clone(); match tasks.clone().get_mut(&task_id) { None => Err(TaskNotFound(task_id)), Some(mut task) => Ok(task.select_parent().clone()),