diff --git a/glide-core/redis-rs/redis/tests/test_async_cluster_connections_logic.rs b/glide-core/redis-rs/redis/tests/test_async_cluster_connections_logic.rs index 356c5bfc8c..1b4acb3a20 100644 --- a/glide-core/redis-rs/redis/tests/test_async_cluster_connections_logic.rs +++ b/glide-core/redis-rs/redis/tests/test_async_cluster_connections_logic.rs @@ -236,6 +236,7 @@ mod test_connect_and_check { ConnectionDetails { conn: user_conn, ip: Some(ip), + az: None, } .into_future(), None, @@ -283,6 +284,7 @@ mod test_connect_and_check { ConnectionDetails { conn: user_conn, ip: prev_ip, + az: None, } .into_future(), None, @@ -339,12 +341,14 @@ mod test_connect_and_check { ConnectionDetails { conn: old_user_conn, ip: Some(prev_ip), + az: None, } .into_future(), Some( ConnectionDetails { conn: management_conn, ip: Some(prev_ip), + az: None, } .into_future(), ), @@ -380,12 +384,14 @@ mod test_check_node_connections { ConnectionDetails { conn: get_mock_connection_with_port(name, 1, 6380), ip, + az: None, } .into_future(), Some( ConnectionDetails { conn: get_mock_connection_with_port(name, 2, 6381), ip, + az: None, } .into_future(), ), @@ -463,6 +469,7 @@ mod test_check_node_connections { ConnectionDetails { conn: get_mock_connection(name, 1), ip, + az: None, } .into_future(), None, @@ -547,6 +554,7 @@ mod test_check_node_connections { ConnectionDetails { conn: get_mock_connection(name, 1), ip: None, + az: None, } .into_future(), None, diff --git a/glide-core/redis-rs/redis/tests/test_cluster_scan.rs b/glide-core/redis-rs/redis/tests/test_cluster_scan.rs index 48df6fc9ab..0e8a3ca3b2 100644 --- a/glide-core/redis-rs/redis/tests/test_cluster_scan.rs +++ b/glide-core/redis-rs/redis/tests/test_cluster_scan.rs @@ -5,7 +5,9 @@ mod support; mod test_cluster_scan_async { use crate::support::*; use rand::Rng; - use redis::cluster_routing::{MultipleNodeRoutingInfo, RoutingInfo, SingleNodeRoutingInfo}; + use redis::cluster_routing::{ + MultipleNodeRoutingInfo, ResponsePolicy, RoutingInfo, SingleNodeRoutingInfo, + }; use redis::{cmd, from_redis_value, ObjectType, RedisResult, ScanStateRC, Value}; use std::time::Duration; @@ -905,7 +907,8 @@ mod test_cluster_scan_async { #[tokio::test] #[serial_test::serial] - async fn test_async_cluster_scan_with_missing_node() { + /// Test a case where a node is killed, key set into the cluster, and the client is still able to scan all keys + async fn test_async_cluster_scan_uncovered_slots_of_missing_node() { // Create a cluster with 3 nodes let cluster = TestClusterContext::new_with_cluster_client_builder( 3, @@ -984,4 +987,157 @@ mod test_cluster_scan_async { } assert!(keys.len() > 0); } + + #[tokio::test] + #[serial_test::serial] + /// Test scanning after killing a node and compare with "KEYS *" from remaining nodes + async fn test_async_cluster_scan_after_node_killed() { + // Create a cluster with 3 nodes + let cluster = TestClusterContext::new_with_cluster_client_builder( + 3, + 0, + |builder| builder.retries(0), + false, + ); + let mut connection = cluster.async_connection(None).await; + + // Set cluster-require-full-coverage to no + let mut config_cmd = cmd("CONFIG"); + config_cmd + .arg("SET") + .arg("cluster-require-full-coverage") + .arg("no"); + let _: RedisResult = connection + .route_command( + &config_cmd, + RoutingInfo::MultiNode((MultipleNodeRoutingInfo::AllNodes, None)), + ) + .await; + + for i in 0..100 { + let key = format!("key{}", i); + let _res: RedisResult<()> = redis::cmd("SET") + .arg(&key) + .arg("value") + .query_async(&mut connection) + .await; + } + + // Kill one node + let cluster_nodes = cluster.get_cluster_nodes().await; + let slot_distribution = cluster.get_slots_ranges_distribution(&cluster_nodes); + let killed_node_routing = kill_one_node(&cluster, slot_distribution.clone()).await; + let ready = cluster.wait_for_fail_to_finish(&killed_node_routing).await; + match ready { + Ok(_) => {} + Err(e) => { + println!("error: {:?}", e); + } + } + + // Scan the keys + let mut scan_state_rc = ScanStateRC::new(); + let mut keys: Vec = vec![]; + loop { + let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection + .cluster_scan(scan_state_rc, None, None, true) + .await + .unwrap(); + scan_state_rc = next_cursor; + let mut scan_keys = scan_keys + .into_iter() + .map(|v| from_redis_value(&v).unwrap()) + .collect::>(); // Change the type of `keys` to `Vec` + keys.append(&mut scan_keys); + if scan_state_rc.is_finished() { + break; + } + } + + // Get keys from remaining nodes using "KEYS *" + let mut keys_from_keys_command: Vec = Vec::new(); + let key_res: RedisResult = connection + .route_command( + cmd("KEYS").arg("*"), + RoutingInfo::MultiNode(( + MultipleNodeRoutingInfo::AllNodes, + Some(ResponsePolicy::CombineArrays), + )), + ) + .await; + if let Ok(value) = key_res { + let values: Vec = from_redis_value(&value).unwrap(); + keys_from_keys_command + .extend(values.into_iter().map(|v| from_redis_value(&v).unwrap())); + } + + // Sort and dedup keys + keys.sort(); + keys.dedup(); + keys_from_keys_command.sort(); + keys_from_keys_command.dedup(); + + // Check if scanned keys match keys from "KEYS *" + assert_eq!(keys, keys_from_keys_command); + } + + #[tokio::test] + #[serial_test::serial] + /// Test scanning with allow_non_covered_slots as false after killing a node + async fn test_async_cluster_scan_uncovered_slots_fail() { + // Create a cluster with 3 nodes + let cluster = TestClusterContext::new_with_cluster_client_builder( + 3, + 0, + |builder| builder.retries(0), + false, + ); + let mut connection = cluster.async_connection(None).await; + + // Kill one node + let cluster_nodes = cluster.get_cluster_nodes().await; + let slot_distribution = cluster.get_slots_ranges_distribution(&cluster_nodes); + let killed_node_routing = kill_one_node(&cluster, slot_distribution.clone()).await; + let ready = cluster.wait_for_fail_to_finish(&killed_node_routing).await; + match ready { + Ok(_) => {} + Err(e) => { + println!("error: {:?}", e); + } + } + + for i in 0..100 { + let key = format!("key{}", i); + let _res: RedisResult<()> = redis::cmd("SET") + .arg(&key) + .arg("value") + .query_async(&mut connection) + .await; + } + + // Try scanning with allow_non_covered_slots as false + let mut scan_state_rc = ScanStateRC::new(); + let mut had_error = false; + loop { + let result = connection + .cluster_scan(scan_state_rc.clone(), None, None, false) + .await; + + match result { + Ok((next_cursor, _)) => { + scan_state_rc = next_cursor; + if scan_state_rc.is_finished() { + break; + } + } + Err(e) => { + had_error = true; + assert_eq!(e.kind(), redis::ErrorKind::NotAllSlotsCovered); + break; + } + } + } + + assert!(had_error); + } } diff --git a/python/python/tests/test_scan.py b/python/python/tests/test_scan.py index 9696526493..c29235b6c1 100644 --- a/python/python/tests/test_scan.py +++ b/python/python/tests/test_scan.py @@ -23,7 +23,6 @@ async def is_cluster_ready(client: GlideClusterClient, count: int) -> bool: while True: if asyncio.get_event_loop().time() - start > timeout: return False - print(asyncio.get_event_loop().time() - start) nodes_raw = await client.custom_command(["CLUSTER", "NODES"]) node_bytes_raw = cast(bytes, nodes_raw) parsed_nodes = [