Skip to content

Commit

Permalink
chore: extend RouteProvider trait with routes_stats() (#629)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikolay-komarevskiy authored Jan 27, 2025
1 parent a96022b commit c584390
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 14 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

* Added `CanisterInfo` to `MgmtMethod`.

* Extended `RouteProvider` trait with `fn routes_stats()`, returning the number of total and healthy routes.

## [0.39.2] - 2024-12-20

* Bumped `ic-certification` to `3.0.0`.
Expand Down
35 changes: 35 additions & 0 deletions ic-agent/src/agent/route_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,25 @@ const ICP0_SUB_DOMAIN: &str = ".icp0.io";
const ICP_API_SUB_DOMAIN: &str = ".icp-api.io";
const LOCALHOST_SUB_DOMAIN: &str = ".localhost";

/// Statistical info about routing urls.
#[derive(Debug, PartialEq)]
pub struct RoutesStats {
/// Total number of existing routes (both healthy and unhealthy).
pub total: usize,

/// Number of currently healthy routes, or None if health status information is unavailable.
/// A healthy route is one that is available and ready to receive traffic.
/// The specific criteria for what constitutes a "healthy" route is implementation dependent.
pub healthy: Option<usize>,
}

impl RoutesStats {
/// Creates an new instance of [`RoutesStats`].
pub fn new(total: usize, healthy: Option<usize>) -> Self {
Self { total, healthy }
}
}

/// A [`RouteProvider`] for dynamic generation of routing urls.
pub trait RouteProvider: std::fmt::Debug + Send + Sync {
/// Generates the next routing URL based on the internal routing logic.
Expand All @@ -51,6 +70,9 @@ pub trait RouteProvider: std::fmt::Debug + Send + Sync {
/// appearing first. The returned vector can contain fewer than `n` URLs if
/// fewer are available.
fn n_ordered_routes(&self, n: usize) -> Result<Vec<Url>, AgentError>;

/// Returns statistics about the total number of existing routes and the number of healthy routes.
fn routes_stats(&self) -> RoutesStats;
}

/// A simple implementation of the [`RouteProvider`] which produces an even distribution of the urls from the input ones.
Expand Down Expand Up @@ -94,6 +116,10 @@ impl RouteProvider for RoundRobinRouteProvider {

Ok(urls)
}

fn routes_stats(&self) -> RoutesStats {
RoutesStats::new(self.routes.len(), None)
}
}

impl RoundRobinRouteProvider {
Expand Down Expand Up @@ -133,6 +159,9 @@ impl RouteProvider for Url {
fn n_ordered_routes(&self, _: usize) -> Result<Vec<Url>, AgentError> {
Ok(vec![self.route()?])
}
fn routes_stats(&self) -> RoutesStats {
RoutesStats::new(1, None)
}
}

/// A [`RouteProvider`] that will attempt to discover new boundary nodes and cycle through them, optionally prioritizing those with low latency.
Expand Down Expand Up @@ -215,6 +244,9 @@ impl RouteProvider for DynamicRouteProvider {
fn n_ordered_routes(&self, n: usize) -> Result<Vec<Url>, AgentError> {
self.inner.n_ordered_routes(n)
}
fn routes_stats(&self) -> RoutesStats {
self.inner.routes_stats()
}
}

/// Strategy for [`DynamicRouteProvider`]'s routing mechanism.
Expand Down Expand Up @@ -270,6 +302,9 @@ impl<R: RouteProvider> RouteProvider for UrlUntilReady<R> {
self.url.route()
}
}
fn routes_stats(&self) -> RoutesStats {
RoutesStats::new(1, None)
}
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
snapshot::routing_snapshot::RoutingSnapshot,
type_aliases::AtomicSwap,
},
RouteProvider,
RouteProvider, RoutesStats,
},
HttpService,
},
Expand Down Expand Up @@ -186,6 +186,11 @@ where
let urls = nodes.iter().map(|n| n.to_routing_url()).collect();
Ok(urls)
}

fn routes_stats(&self) -> RoutesStats {
let snapshot = self.routing_snapshot.load();
snapshot.routes_stats()
}
}

impl<S> DynamicRouteProvider<S>
Expand Down Expand Up @@ -291,7 +296,7 @@ mod tests {
assert_routed_domains, route_n_times, NodeHealthCheckerMock, NodesFetcherMock,
},
},
RouteProvider,
RouteProvider, RoutesStats,
},
Agent, AgentError,
};
Expand Down Expand Up @@ -417,6 +422,7 @@ mod tests {
tokio::time::sleep(snapshot_update_duration).await;
let routed_domains = route_n_times(6, Arc::clone(&route_provider));
assert_routed_domains(routed_domains, vec![node_1.domain()], 6);
assert_eq!(route_provider.routes_stats(), RoutesStats::new(1, Some(1)));

// Test 2: multiple route() calls return 3 different domains with equal fairness (repetition).
// Two healthy nodes are added to the topology.
Expand All @@ -431,13 +437,15 @@ mod tests {
vec![node_1.domain(), node_2.domain(), node_3.domain()],
2,
);
assert_eq!(route_provider.routes_stats(), RoutesStats::new(3, Some(3)));

// Test 3: multiple route() calls return 2 different domains with equal fairness (repetition).
// One node is set to unhealthy.
checker.overwrite_healthy_nodes(vec![node_1.clone(), node_3.clone()]);
tokio::time::sleep(snapshot_update_duration).await;
let routed_domains = route_n_times(6, Arc::clone(&route_provider));
assert_routed_domains(routed_domains, vec![node_1.domain(), node_3.domain()], 3);
assert_eq!(route_provider.routes_stats(), RoutesStats::new(3, Some(2)));

// Test 4: multiple route() calls return 3 different domains with equal fairness (repetition).
// Unhealthy node is set back to healthy.
Expand All @@ -449,6 +457,7 @@ mod tests {
vec![node_1.domain(), node_2.domain(), node_3.domain()],
2,
);
assert_eq!(route_provider.routes_stats(), RoutesStats::new(3, Some(3)));

// Test 5: multiple route() calls return 3 different domains with equal fairness (repetition).
// One healthy node is added, but another one goes unhealthy.
Expand All @@ -467,6 +476,7 @@ mod tests {
vec![node_2.domain(), node_3.domain(), node_4.domain()],
2,
);
assert_eq!(route_provider.routes_stats(), RoutesStats::new(4, Some(3)));

// Test 6: multiple route() calls return a single domain=api1.com.
// One node is set to unhealthy and one is removed from the topology.
Expand All @@ -475,6 +485,7 @@ mod tests {
tokio::time::sleep(snapshot_update_duration).await;
let routed_domains = route_n_times(3, Arc::clone(&route_provider));
assert_routed_domains(routed_domains, vec![node_2.domain()], 3);
assert_eq!(route_provider.routes_stats(), RoutesStats::new(3, Some(1)));
}

#[tokio::test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ use std::{

use rand::Rng;

use crate::agent::route_provider::dynamic_routing::{
health_check::HealthCheckStatus, node::Node, snapshot::routing_snapshot::RoutingSnapshot,
use crate::agent::route_provider::{
dynamic_routing::{
health_check::HealthCheckStatus, node::Node, snapshot::routing_snapshot::RoutingSnapshot,
},
RoutesStats,
};

// Determines the size of the sliding window used for storing latencies and availabilities of nodes.
Expand Down Expand Up @@ -157,6 +160,7 @@ fn compute_score(
pub struct LatencyRoutingSnapshot {
nodes_with_metrics: Vec<NodeWithMetrics>,
existing_nodes: HashSet<Node>,
healthy_nodes: HashSet<Node>,
window_weights: Vec<f64>,
window_weights_sum: f64,
use_availability_penalty: bool,
Expand All @@ -174,6 +178,7 @@ impl LatencyRoutingSnapshot {
Self {
nodes_with_metrics: vec![],
existing_nodes: HashSet::new(),
healthy_nodes: HashSet::new(),
use_availability_penalty: true,
window_weights,
window_weights_sum,
Expand Down Expand Up @@ -302,6 +307,12 @@ impl RoutingSnapshot for LatencyRoutingSnapshot {
self.nodes_with_metrics.len() - 1
});

if health.is_healthy() {
self.healthy_nodes.insert(node.clone());
} else {
self.healthy_nodes.remove(node);
}

self.nodes_with_metrics[idx].add_latency_measurement(health.latency());

self.nodes_with_metrics[idx].score = compute_score(
Expand All @@ -314,6 +325,10 @@ impl RoutingSnapshot for LatencyRoutingSnapshot {

true
}

fn routes_stats(&self) -> RoutesStats {
RoutesStats::new(self.existing_nodes.len(), Some(self.healthy_nodes.len()))
}
}

#[cfg(test)]
Expand All @@ -323,15 +338,18 @@ mod tests {
time::Duration,
};

use crate::agent::route_provider::dynamic_routing::{
health_check::HealthCheckStatus,
node::Node,
snapshot::{
latency_based_routing::{
compute_score, weighted_sample, LatencyRoutingSnapshot, NodeWithMetrics,
use crate::agent::route_provider::{
dynamic_routing::{
health_check::HealthCheckStatus,
node::Node,
snapshot::{
latency_based_routing::{
compute_score, weighted_sample, LatencyRoutingSnapshot, NodeWithMetrics,
},
routing_snapshot::RoutingSnapshot,
},
routing_snapshot::RoutingSnapshot,
},
RoutesStats,
};

#[test]
Expand All @@ -344,6 +362,7 @@ mod tests {
assert!(!snapshot.has_nodes());
assert!(snapshot.next_node().is_none());
assert!(snapshot.next_n_nodes(1).is_empty());
assert_eq!(snapshot.routes_stats(), RoutesStats::new(0, Some(0)));
}

#[test]
Expand All @@ -359,6 +378,7 @@ mod tests {
assert!(snapshot.nodes_with_metrics.is_empty());
assert!(!snapshot.has_nodes());
assert!(snapshot.next_node().is_none());
assert_eq!(snapshot.routes_stats(), RoutesStats::new(0, Some(0)));
}

#[test]
Expand All @@ -370,13 +390,15 @@ mod tests {
let node = Node::new("api1.com").unwrap();
let health = HealthCheckStatus::new(Some(Duration::from_secs(1)));
snapshot.existing_nodes.insert(node.clone());
assert_eq!(snapshot.routes_stats(), RoutesStats::new(1, Some(0)));
// Check first update
let is_updated = snapshot.update_node(&node, health);
assert!(is_updated);
assert!(snapshot.has_nodes());
let node_with_metrics = snapshot.nodes_with_metrics.first().unwrap();
assert_eq!(node_with_metrics.score, (2.0 / 1.0) / 2.0);
assert_eq!(snapshot.next_node().unwrap(), node);
assert_eq!(snapshot.routes_stats(), RoutesStats::new(1, Some(1)));
// Check second update
let health = HealthCheckStatus::new(Some(Duration::from_secs(2)));
let is_updated = snapshot.update_node(&node, health);
Expand All @@ -399,6 +421,7 @@ mod tests {
assert_eq!(snapshot.nodes_with_metrics.len(), 1);
assert_eq!(snapshot.existing_nodes.len(), 1);
assert!(snapshot.next_node().is_none());
assert_eq!(snapshot.routes_stats(), RoutesStats::new(1, Some(0)));
}

#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ use std::{
},
};

use crate::agent::route_provider::dynamic_routing::{
health_check::HealthCheckStatus, node::Node, snapshot::routing_snapshot::RoutingSnapshot,
use crate::agent::route_provider::{
dynamic_routing::{
health_check::HealthCheckStatus, node::Node, snapshot::routing_snapshot::RoutingSnapshot,
},
RoutesStats,
};

/// Routing snapshot, which samples nodes in a round-robin fashion.
Expand Down Expand Up @@ -106,6 +109,10 @@ impl RoutingSnapshot for RoundRobinRoutingSnapshot {
self.healthy_nodes.remove(node)
}
}

fn routes_stats(&self) -> RoutesStats {
RoutesStats::new(self.existing_nodes.len(), Some(self.healthy_nodes.len()))
}
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::fmt::Debug;

use crate::agent::route_provider::dynamic_routing::{health_check::HealthCheckStatus, node::Node};
use crate::agent::route_provider::{
dynamic_routing::{health_check::HealthCheckStatus, node::Node},
RoutesStats,
};

/// A trait for interacting with the snapshot of nodes (routing table).
pub trait RoutingSnapshot: Send + Sync + Clone + Debug {
Expand All @@ -15,4 +18,6 @@ pub trait RoutingSnapshot: Send + Sync + Clone + Debug {
fn sync_nodes(&mut self, nodes: &[Node]) -> bool;
/// Updates the health status of a specific node, returning `true` if the node was found and updated.
fn update_node(&mut self, node: &Node, health: HealthCheckStatus) -> bool;
/// Returns statistics about the routes (nodes).
fn routes_stats(&self) -> RoutesStats;
}

0 comments on commit c584390

Please sign in to comment.