Skip to content

Commit

Permalink
fix select_parent error & optimize codes.
Browse files Browse the repository at this point in the history
Signed-off-by: baowj <[email protected]>
  • Loading branch information
baowj-678 committed Jan 22, 2025
1 parent b959e68 commit abf164d
Showing 1 changed file with 62 additions and 76 deletions.
138 changes: 62 additions & 76 deletions dragonfly-client/src/resource/parent_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,15 @@ use std::num::NonZeroUsize;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio_stream::StreamExt;
use tracing::{error, info, instrument, Instrument};
use tracing::{debug, error, info, instrument, Instrument};
use validator::HasLen;

#[allow(dead_code)]
const DEFAULT_AVAILABLE_CAPACITY: f64 = ByteSize::gb(10).as_u64() as f64;
const DEFAULT_AVAILABLE_CAPACITY: f64 = ByteSize::mb(10000 / 8).as_u64() as f64;

#[allow(dead_code)]
const DEFAULT_SYNC_HOST_TIMEOUT: u32 = 5;

/// Parent is used to control sync host thread.
#[derive(Clone)]
#[allow(dead_code)]
pub struct Parent {
/// parent is the CollectedParent to sync host.
parent: CollectedParent,

/// shutdown is used to stop sync host thread.
shutdown: Shutdown,
}

/// TaskParentSelector is used to store data to select parents for specific task.
#[derive(Clone)]
#[allow(dead_code)]
Expand All @@ -67,7 +56,7 @@ pub struct TaskParentSelector {
/// probability is the selection probability of different parents.
probability: Vec<f64>,

/// last_sync_time records the latest time for refreshing probability.
/// last_sync_time records the latest time for refreshing probability lazily.
last_sync_time: SystemTime,

/// sync_interval represents the time interval between two refreshing probability operations.
Expand All @@ -89,7 +78,7 @@ impl TaskParentSelector {
let mut parent_list: Vec<String> = Vec::new();
let mut probability: Vec<f64> = Vec::new();

let _ = collected_parents.iter().map(|parent| {
collected_parents.iter().for_each(|parent| {
parents.insert(parent.id.clone(), Host::default());
parent_list.push(parent.id.clone());
probability.push(1f64 / parents.len() as f64);
Expand Down Expand Up @@ -118,10 +107,10 @@ impl TaskParentSelector {
// Lazy refresh probability.
if now_time.duration_since(self.last_sync_time).unwrap() > self.sync_interval {
let mut parent_available_capacity = Vec::with_capacity(self.probability.len());
let mut count = 0;
let mut sum = 0f64;
let mut sum: f64 = 0f64;
let parent_map = self.parents.clone();

// Update parent host available capacity.
self.parent_list
.iter()
.for_each(|parent_id| match parent_map.get(parent_id) {
Expand All @@ -132,32 +121,26 @@ impl TaskParentSelector {
Ok(capacity) => {
parent_available_capacity.push(capacity);
sum += capacity;
count += 1;
}
Err(_) => {
parent_available_capacity.push(0f64);
}
},
});
// Calc average available capacity.
let mut avg = DEFAULT_AVAILABLE_CAPACITY;
if count != 0 {
avg = sum / count as f64;
}
// Calc sum.
sum += avg * (parent_available_capacity.len() - count) as f64;

// Prevent division by 0
sum += 0.1f64;
let count = self.parent_list.len();

// Update probability.
let _ = self.probability.iter_mut().enumerate().map(|(idx, p)| {
if parent_available_capacity[idx] == 0f64 {
*p = avg / sum;
} else {
*p = parent_available_capacity[idx] / sum;
}
});
self.probability
.iter_mut()
.enumerate()
.for_each(|(idx, p)| {
if sum > 0f64 {
*p = parent_available_capacity[idx] / sum;
} else {
*p = 1f64 / count as f64;
}
});
debug!("update probability to {:?}", self.probability);
// Reset last_sync_time.
self.last_sync_time = now_time;
}
Expand All @@ -177,7 +160,10 @@ impl TaskParentSelector {

/// available_capacity return the available capacity of the host.
fn available_capacity(host: Host) -> Result<f64> {
Ok(host.network.unwrap().upload_rate as f64)
match host.network {
None => Ok(DEFAULT_AVAILABLE_CAPACITY),
Some(network) => Ok(network.upload_rate as f64),
}
}
}

Expand All @@ -191,10 +177,10 @@ pub struct ParentSelector {
sync_interval: Duration,

/// tasks is the collector for all parent selection tasks.
tasks: DashMap<String, TaskParentSelector>,
selectors: Arc<DashMap<String, TaskParentSelector>>,

/// parent_cache is the lru cache to store sync host thread.
parent_cache: Arc<Mutex<LruCache<String, Parent>>>,
/// cache is the lru cache to store sync host thread.
cache: Arc<Mutex<LruCache<String, Shutdown>>>,

/// id_generator is a IDGenerator.
id_generator: Arc<IDGenerator>,
Expand All @@ -208,7 +194,7 @@ impl ParentSelector {
pub fn new(config: Arc<Config>, id_generator: Arc<IDGenerator>) -> ParentSelector {
let config = config.clone();
let sync_interval = config.download.parent_selector.sync_interval;
let tasks = DashMap::new();
let selectors = Arc::new(DashMap::new());
let parent_cache = LruCache::new(
NonZeroUsize::try_from(config.download.parent_selector.capacity).unwrap(),
);
Expand All @@ -217,62 +203,61 @@ impl ParentSelector {
ParentSelector {
config,
sync_interval,
tasks,
parent_cache: Arc::new(Mutex::new(parent_cache)),
selectors,
cache: Arc::new(Mutex::new(parent_cache)),
id_generator,
}
}

/// register_parents registers task and it's parents.
#[instrument(skip_all)]
pub fn register_parents(&self, task_id: String, add_parents: &Vec<CollectedParent>) {
// No parents.
if add_parents.length() == 0 {
// If not enable.
if !self.config.download.parent_selector.enable {
return;
}
// Get tasks
let tasks = self.tasks.clone();

// Add task
let task = TaskParentSelector::new(add_parents.clone(), self.sync_interval);
tasks.insert(task_id, task);

// If not enable.
if !self.config.download.parent_selector.enable {
// No parents, skip.
if add_parents.length() == 0 {
info!("register failed, parents length = 0");
return;
}
// Get all task parent selectors.
let selectors = self.selectors.clone();

// Add task parent selector.
let selector = TaskParentSelector::new(add_parents.clone(), self.sync_interval);
selectors.insert(task_id, selector);

// Get LRU cache.
let cache = self.parent_cache.clone();
let cache = self.cache.clone();
let cache = cache.lock();
let config = self.config.clone();

if let Ok(mut cache) = cache {
for parent in add_parents {
// already contains parent.id, move to head and skip.
if cache.get(&parent.id).is_some() {
continue;
}

// Create shutdown to control thread.
let shutdown = Shutdown::new();

// Create Parent
let new_parent = Parent {
parent: parent.clone(),
shutdown: shutdown.clone(),
};
if cache.len() == cache.cap().get() {
if let Some(element) = cache.pop_lru() {
// Stop popped thread.
element.1.shutdown.trigger();
}
// Push new parent to the LRU cache.
if let Some(old_shutdown) = cache.put(parent.id.clone(), shutdown.clone()) {
// Shutdown popped sync_host thread.
old_shutdown.trigger();
}
cache.push(parent.id.clone(), new_parent);

// Start new thread.
// Start new sync_host thread.
let config = config.clone();
let host_id = self.id_generator.host_id();
let peer_id = self.id_generator.peer_id();
let parent = parent.clone();
let tasks = self.tasks.clone();
let tasks = self.selectors.clone();
let shutdown = shutdown.clone();
let sync_host_timeout =
config.download.parent_selector.sync_interval * DEFAULT_SYNC_HOST_TIMEOUT;
let sync_host_timeout = self.sync_interval * DEFAULT_SYNC_HOST_TIMEOUT;
tokio::spawn(
async move {
let _ = Self::sync_host(
Expand All @@ -295,8 +280,9 @@ impl ParentSelector {
/// unregister_parents unregisters task.
#[instrument(skip_all)]
pub fn unregister_parents(&self, task_id: String) {
let tasks = self.tasks.clone();
tasks.remove(&task_id);
let selectors = self.selectors.clone();
// Remove this task parent selector.
selectors.remove(&task_id);
}

/// sync_host is a sub thread to sync host info from the parent.
Expand All @@ -307,7 +293,7 @@ impl ParentSelector {
host_id: String,
peer_id: String,
parent: CollectedParent,
tasks: DashMap<String, TaskParentSelector>,
selectors: Arc<DashMap<String, TaskParentSelector>>,
shutdown: Shutdown,
sync_host_timeout: Duration,
) -> Result<()> {
Expand Down Expand Up @@ -345,7 +331,7 @@ impl ParentSelector {
tokio::pin!(out_stream);

// Get tasks.
let tasks = tasks.clone();
let selectors = selectors.clone();
while let Some(message) = out_stream.try_next().await.or_err(ErrorType::StreamError)? {
// Check shutdown.
if shutdown.is_shutdown() {
Expand All @@ -354,9 +340,9 @@ impl ParentSelector {
// Deal with massage.
match message {
Ok(message) => {
// Update the parent host information for all tasks associated with this parent.
tasks.iter_mut().for_each(|task| {
if let Some(mut parent_info) = task.parents.get_mut(&parent.id) {
// Update the parent's host info for all selectors associated with this parent.
selectors.iter_mut().for_each(|selector| {
if let Some(mut parent_info) = selector.parents.get_mut(&parent.id) {
*parent_info = message.clone();
}
});
Expand All @@ -374,7 +360,7 @@ impl ParentSelector {
/// optimal_parent get optimal parent for the task.
#[instrument(skip_all)]
pub fn optimal_parent(&self, task_id: String) -> Result<String> {
let tasks = self.tasks.clone();
let tasks = self.selectors.clone();
match tasks.clone().get_mut(&task_id) {
None => Err(TaskNotFound(task_id)),
Some(mut task) => Ok(task.select_parent().clone()),
Expand Down

0 comments on commit abf164d

Please sign in to comment.