Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: extend RouteProvider trait with routes_stats() #629

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

* Added `CanisterInfo` to `MgmtMethod`.

* Extended `RouteProvider` trait with `fn routes_stats()`, returning the number of total and healthy routes as a tuple.

## [0.39.2] - 2024-12-20

* Bumped `ic-certification` to `3.0.0`.
Expand Down
22 changes: 22 additions & 0 deletions ic-agent/src/agent/route_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ pub trait RouteProvider: std::fmt::Debug + Send + Sync {
/// appearing first. The returned vector can contain fewer than `n` URLs if
/// fewer are available.
fn n_ordered_routes(&self, n: usize) -> Result<Vec<Url>, AgentError>;

/// Returns the total number of routes and healthy routes as a tuple.
///
/// - First element is the total number of routes available (both healthy and unhealthy)
/// - Second element is the number of currently healthy routes, or None if health status information is unavailable
///
/// A healthy route is one that is available and ready to receive traffic.
/// The specific criteria for what constitutes a "healthy" route is implementation dependent.
fn routes_stats(&self) -> (usize, Option<usize>);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest using a type rather than a tuple

Suggested change
fn routes_stats(&self) -> (usize, Option<usize>);
fn routes_stats(&self) -> RoutesStats;

along with something like this

pub struct RoutesStats {
    /// total number of routes available (both healthy and unhealthy)
    pub total : usize;

    /// number of currently healthy routes, or None if health status information is unavailable
    ///
    /// A healthy route is one that is available and ready to receive traffic.
    /// The specific criteria for what constitutes a "healthy" route is implementation dependent.
    pub healthy: Option<usize>;
}

}

/// A simple implementation of the [`RouteProvider`] which produces an even distribution of the urls from the input ones.
Expand Down Expand Up @@ -94,6 +103,10 @@ impl RouteProvider for RoundRobinRouteProvider {

Ok(urls)
}

fn routes_stats(&self) -> (usize, Option<usize>) {
(self.routes.len(), None)
}
}

impl RoundRobinRouteProvider {
Expand Down Expand Up @@ -133,6 +146,9 @@ impl RouteProvider for Url {
fn n_ordered_routes(&self, _: usize) -> Result<Vec<Url>, AgentError> {
Ok(vec![self.route()?])
}
fn routes_stats(&self) -> (usize, Option<usize>) {
(1, None)
}
}

/// A [`RouteProvider`] that will attempt to discover new boundary nodes and cycle through them, optionally prioritizing those with low latency.
Expand Down Expand Up @@ -215,6 +231,9 @@ impl RouteProvider for DynamicRouteProvider {
fn n_ordered_routes(&self, n: usize) -> Result<Vec<Url>, AgentError> {
self.inner.n_ordered_routes(n)
}
fn routes_stats(&self) -> (usize, Option<usize>) {
self.inner.routes_stats()
}
}

/// Strategy for [`DynamicRouteProvider`]'s routing mechanism.
Expand Down Expand Up @@ -270,6 +289,9 @@ impl<R: RouteProvider> RouteProvider for UrlUntilReady<R> {
self.url.route()
}
}
fn routes_stats(&self) -> (usize, Option<usize>) {
(1, None)
}
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ where
let urls = nodes.iter().map(|n| n.to_routing_url()).collect();
Ok(urls)
}

fn routes_stats(&self) -> (usize, Option<usize>) {
let snapshot = self.routing_snapshot.load();
snapshot.nodes_stats()
}
}

impl<S> DynamicRouteProvider<S>
Expand Down Expand Up @@ -417,6 +422,7 @@ mod tests {
tokio::time::sleep(snapshot_update_duration).await;
let routed_domains = route_n_times(6, Arc::clone(&route_provider));
assert_routed_domains(routed_domains, vec![node_1.domain()], 6);
assert_eq!(route_provider.routes_stats(), (1, Some(1)));

// Test 2: multiple route() calls return 3 different domains with equal fairness (repetition).
// Two healthy nodes are added to the topology.
Expand All @@ -431,13 +437,15 @@ mod tests {
vec![node_1.domain(), node_2.domain(), node_3.domain()],
2,
);
assert_eq!(route_provider.routes_stats(), (3, Some(3)));

// Test 3: multiple route() calls return 2 different domains with equal fairness (repetition).
// One node is set to unhealthy.
checker.overwrite_healthy_nodes(vec![node_1.clone(), node_3.clone()]);
tokio::time::sleep(snapshot_update_duration).await;
let routed_domains = route_n_times(6, Arc::clone(&route_provider));
assert_routed_domains(routed_domains, vec![node_1.domain(), node_3.domain()], 3);
assert_eq!(route_provider.routes_stats(), (3, Some(2)));

// Test 4: multiple route() calls return 3 different domains with equal fairness (repetition).
// Unhealthy node is set back to healthy.
Expand All @@ -449,6 +457,7 @@ mod tests {
vec![node_1.domain(), node_2.domain(), node_3.domain()],
2,
);
assert_eq!(route_provider.routes_stats(), (3, Some(3)));

// Test 5: multiple route() calls return 3 different domains with equal fairness (repetition).
// One healthy node is added, but another one goes unhealthy.
Expand All @@ -467,6 +476,7 @@ mod tests {
vec![node_2.domain(), node_3.domain(), node_4.domain()],
2,
);
assert_eq!(route_provider.routes_stats(), (4, Some(3)));

// Test 6: multiple route() calls return a single domain=api1.com.
// One node is set to unhealthy and one is removed from the topology.
Expand All @@ -475,6 +485,7 @@ mod tests {
tokio::time::sleep(snapshot_update_duration).await;
let routed_domains = route_n_times(3, Arc::clone(&route_provider));
assert_routed_domains(routed_domains, vec![node_2.domain()], 3);
assert_eq!(route_provider.routes_stats(), (3, Some(1)));
}

#[tokio::test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ fn compute_score(
pub struct LatencyRoutingSnapshot {
nodes_with_metrics: Vec<NodeWithMetrics>,
existing_nodes: HashSet<Node>,
healthy_nodes: HashSet<Node>,
window_weights: Vec<f64>,
window_weights_sum: f64,
use_availability_penalty: bool,
Expand All @@ -174,6 +175,7 @@ impl LatencyRoutingSnapshot {
Self {
nodes_with_metrics: vec![],
existing_nodes: HashSet::new(),
healthy_nodes: HashSet::new(),
use_availability_penalty: true,
window_weights,
window_weights_sum,
Expand Down Expand Up @@ -302,6 +304,12 @@ impl RoutingSnapshot for LatencyRoutingSnapshot {
self.nodes_with_metrics.len() - 1
});

if health.is_healthy() {
self.healthy_nodes.insert(node.clone());
} else {
self.healthy_nodes.remove(node);
}

self.nodes_with_metrics[idx].add_latency_measurement(health.latency());

self.nodes_with_metrics[idx].score = compute_score(
Expand All @@ -314,6 +322,10 @@ impl RoutingSnapshot for LatencyRoutingSnapshot {

true
}

fn nodes_stats(&self) -> (usize, Option<usize>) {
(self.existing_nodes.len(), Some(self.healthy_nodes.len()))
}
}

#[cfg(test)]
Expand Down Expand Up @@ -344,6 +356,7 @@ mod tests {
assert!(!snapshot.has_nodes());
assert!(snapshot.next_node().is_none());
assert!(snapshot.next_n_nodes(1).is_empty());
assert_eq!(snapshot.nodes_stats(), (0, Some(0)));
}

#[test]
Expand All @@ -359,6 +372,7 @@ mod tests {
assert!(snapshot.nodes_with_metrics.is_empty());
assert!(!snapshot.has_nodes());
assert!(snapshot.next_node().is_none());
assert_eq!(snapshot.nodes_stats(), (0, Some(0)));
}

#[test]
Expand All @@ -370,13 +384,15 @@ mod tests {
let node = Node::new("api1.com").unwrap();
let health = HealthCheckStatus::new(Some(Duration::from_secs(1)));
snapshot.existing_nodes.insert(node.clone());
assert_eq!(snapshot.nodes_stats(), (1, Some(0)));
// Check first update
let is_updated = snapshot.update_node(&node, health);
assert!(is_updated);
assert!(snapshot.has_nodes());
let node_with_metrics = snapshot.nodes_with_metrics.first().unwrap();
assert_eq!(node_with_metrics.score, (2.0 / 1.0) / 2.0);
assert_eq!(snapshot.next_node().unwrap(), node);
assert_eq!(snapshot.nodes_stats(), (1, Some(1)));
// Check second update
let health = HealthCheckStatus::new(Some(Duration::from_secs(2)));
let is_updated = snapshot.update_node(&node, health);
Expand All @@ -399,6 +415,7 @@ mod tests {
assert_eq!(snapshot.nodes_with_metrics.len(), 1);
assert_eq!(snapshot.existing_nodes.len(), 1);
assert!(snapshot.next_node().is_none());
assert_eq!(snapshot.nodes_stats(), (1, Some(0)));
}

#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ impl RoutingSnapshot for RoundRobinRoutingSnapshot {
self.healthy_nodes.remove(node)
}
}

fn nodes_stats(&self) -> (usize, Option<usize>) {
(self.existing_nodes.len(), Some(self.healthy_nodes.len()))
}
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,11 @@ pub trait RoutingSnapshot: Send + Sync + Clone + Debug {
fn sync_nodes(&mut self, nodes: &[Node]) -> bool;
/// Updates the health status of a specific node, returning `true` if the node was found and updated.
fn update_node(&mut self, node: &Node, health: HealthCheckStatus) -> bool;
/// Returns the total number of nodes and healthy nodes as a tuple.
///
/// - First element is the total number of nodes available (both healthy and unhealthy)
/// - Second element is the number of currently healthy nodes, or None if health status information is unavailable
///
/// The specific criteria for what constitutes a "healthy" node is implementation dependent.
fn nodes_stats(&self) -> (usize, Option<usize>);
}
Loading