From cfb75073ee7364c5fc439a2abf3570ae8e4a5683 Mon Sep 17 00:00:00 2001 From: Mustaque Ahmed Date: Mon, 19 Aug 2024 02:26:12 +0530 Subject: [PATCH 1/3] test: add test cases for `server.rs` --- examples/simple_run.rs | 2 +- examples/simulate_add_node.rs | 4 +- examples/simulate_node_failure.rs | 4 +- src/server.rs | 63 ++++++++++++++++++++++++++++++- 4 files changed, 67 insertions(+), 6 deletions(-) diff --git a/examples/simple_run.rs b/examples/simple_run.rs index c367c92..951c94f 100644 --- a/examples/simple_run.rs +++ b/examples/simple_run.rs @@ -47,7 +47,7 @@ async fn main() { let cc = cluster_config.clone(); handles.push(tokio::spawn(async move { let mut server = Server::new(id, config, cc).await; - server.start().await; + server.start(None).await; })); } diff --git a/examples/simulate_add_node.rs b/examples/simulate_add_node.rs index a164c87..4cd6e67 100644 --- a/examples/simulate_add_node.rs +++ b/examples/simulate_add_node.rs @@ -44,7 +44,7 @@ async fn main() { let cc = cluster_config.clone(); handles.push(tokio::spawn(async move { let mut server = Server::new(id, config, cc).await; - server.start().await; + server.start(None).await; })); } @@ -65,7 +65,7 @@ async fn main() { // Launching a new node handles.push(tokio::spawn(async move { let mut server = Server::new(new_node_id, new_node_conf, cluster_config).await; - server.start().await; + server.start(None).await; })); // Simulate sending a Raft Join request after a few seconds diff --git a/examples/simulate_node_failure.rs b/examples/simulate_node_failure.rs index 0c3011f..98c1355 100644 --- a/examples/simulate_node_failure.rs +++ b/examples/simulate_node_failure.rs @@ -46,7 +46,7 @@ async fn main() { let cc = cluster_config.clone(); let server_handle = tokio::spawn(async move { let mut server = Server::new(id, config, cc).await; - server.start().await; + server.start(None).await; }); server_handles.push(server_handle); } @@ -78,7 +78,7 @@ async fn main() { let cc = cluster_config.clone(); let server_handle = tokio::spawn(async move { let mut server = Server::new(server_to_stop.try_into().unwrap(), config, cc).await; - server.start().await; + server.start(None).await; }); server_handles[server_to_stop - 1] = server_handle; } diff --git a/src/server.rs b/src/server.rs index 910c31c..135acee 100644 --- a/src/server.rs +++ b/src/server.rs @@ -13,6 +13,7 @@ use std::net::SocketAddr; use std::time::{Duration, Instant}; use tokio::io::AsyncReadExt; use tokio::time::sleep; +use std::sync::mpsc; #[derive(Debug, Clone, PartialEq)] enum RaftState { @@ -135,7 +136,7 @@ impl Server { } } - pub async fn start(&mut self) { + pub async fn start(&mut self, rx: Option>) { if let Err(e) = self.network_manager.open().await { error!(self.log, "Failed to open network manager: {}", e); return; @@ -159,6 +160,11 @@ impl Server { RaftState::Candidate => self.candidate().await, RaftState::Leader => self.leader().await, } + if let Some(ref rx) = rx { + if rx.try_recv().is_ok() { + break; + } + } } } @@ -964,3 +970,58 @@ impl Server { self.peers().len() } } + +#[cfg(test)] +mod tests { + + use crate::cluster::{ClusterConfig, NodeMeta}; + use crate::server::{Server, ServerConfig, RaftState}; + use std::time::Duration; + use std::net::SocketAddr; + use std::str::FromStr; + use std::collections::HashMap; + use std::sync::mpsc; + use std::thread; + + async fn server_util() -> Server { + let id: u32 = 3; + let peers = vec![ + NodeMeta::from((1, SocketAddr::from_str("127.0.0.1:5001").unwrap())), + NodeMeta::from((2, SocketAddr::from_str("127.0.0.1:5002").unwrap())), + NodeMeta::from((3, SocketAddr::from_str("127.0.0.1:5003").unwrap())), + NodeMeta::from((4, SocketAddr::from_str("127.0.0.1:5004").unwrap())), + NodeMeta::from((5, SocketAddr::from_str("127.0.0.1:5005").unwrap())), + ]; + let addr: SocketAddr = peers[0].address; + let cluster_config = ClusterConfig::new(peers.clone()); + let server_config = ServerConfig { + election_timeout: Duration::from_millis(10), + address: addr, + default_leader: Some(3u32), + leadership_preferences: HashMap::new(), + storage_location: Some("".to_string()), + }; + Server::new(id, server_config, cluster_config).await + } + + #[tokio::test] + async fn test_server() { + let server = server_util().await.clone(); + assert_eq!(server.state.state, RaftState::Follower) + } + + #[tokio::test] + async fn test_server_start() { + // will send signal over channel to start method to exit out of the loop + let (tx, rx) = mpsc::channel(); + let t = thread::spawn(move || async { + let mut server = server_util().await.clone(); + server.start(Some(rx)).await; + }); + + thread::sleep(Duration::from_millis(100)); + tx.send(()).unwrap(); + let _ = t.join().unwrap(); + } + +} From 37a499497dd7d3193558595b78abf6f3e49efc41 Mon Sep 17 00:00:00 2001 From: Mustaque Ahmed Date: Mon, 19 Aug 2024 02:42:33 +0530 Subject: [PATCH 2/3] feat: add `clone` to structs --- src/server.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/server.rs b/src/server.rs index 135acee..35ced56 100644 --- a/src/server.rs +++ b/src/server.rs @@ -39,7 +39,7 @@ enum MessageType { JoinResponse, } -#[derive(Debug)] +#[derive(Debug, Clone)] struct ServerState { current_term: u32, state: RaftState, @@ -70,7 +70,7 @@ pub struct LogEntry { pub data: u32, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ServerConfig { pub election_timeout: Duration, pub address: SocketAddr, @@ -80,6 +80,7 @@ pub struct ServerConfig { pub storage_location: Option, } +#[derive(Clone)] pub struct Server { pub id: u32, state: ServerState, From 65d58ce3444897ff34cc674b65892cca65bbf819 Mon Sep 17 00:00:00 2001 From: Mustaque Ahmed Date: Mon, 19 Aug 2024 02:55:28 +0530 Subject: [PATCH 3/3] feat: add test case for `is_quorum` --- src/server.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/server.rs b/src/server.rs index 35ced56..4ab14dc 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1025,4 +1025,27 @@ mod tests { let _ = t.join().unwrap(); } + #[tokio::test] + async fn test_server_start_failed_quorum() { + let id: u32 = 2; + let peers = vec![ + NodeMeta::from((1, SocketAddr::from_str("127.0.0.1:5001").unwrap())), + NodeMeta::from((2, SocketAddr::from_str("127.0.0.1:5002").unwrap())), + ]; + let addr: SocketAddr = peers[0].address; + let cluster_config = ClusterConfig::new(peers.clone()); + let server_config = ServerConfig { + election_timeout: Duration::from_millis(10), + address: addr, + default_leader: Some(1u32), + leadership_preferences: HashMap::new(), + storage_location: Some("".to_string()), + }; + let mut server = Server::new(id, server_config, cluster_config).await; + server.start(None).await; + assert_eq!(server.peer_count(), 1); + let votes = server.state.votes_received.len(); + assert_eq!(server.is_quorum(votes.try_into().unwrap()), false); + } + }