Skip to content

Commit

Permalink
Add tests for async cluster scan behavior with uncovered slots and no…
Browse files Browse the repository at this point in the history
…de failures

Signed-off-by: avifenesh <[email protected]>
  • Loading branch information
avifenesh committed Nov 29, 2024
1 parent af80be4 commit d2ac4f5
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ mod test_connect_and_check {
ConnectionDetails {
conn: user_conn,
ip: Some(ip),
az: None,
}
.into_future(),
None,
Expand Down Expand Up @@ -283,6 +284,7 @@ mod test_connect_and_check {
ConnectionDetails {
conn: user_conn,
ip: prev_ip,
az: None,
}
.into_future(),
None,
Expand Down Expand Up @@ -339,12 +341,14 @@ mod test_connect_and_check {
ConnectionDetails {
conn: old_user_conn,
ip: Some(prev_ip),
az: None,
}
.into_future(),
Some(
ConnectionDetails {
conn: management_conn,
ip: Some(prev_ip),
az: None,
}
.into_future(),
),
Expand Down Expand Up @@ -380,12 +384,14 @@ mod test_check_node_connections {
ConnectionDetails {
conn: get_mock_connection_with_port(name, 1, 6380),
ip,
az: None,
}
.into_future(),
Some(
ConnectionDetails {
conn: get_mock_connection_with_port(name, 2, 6381),
ip,
az: None,
}
.into_future(),
),
Expand Down Expand Up @@ -463,6 +469,7 @@ mod test_check_node_connections {
ConnectionDetails {
conn: get_mock_connection(name, 1),
ip,
az: None,
}
.into_future(),
None,
Expand Down Expand Up @@ -547,6 +554,7 @@ mod test_check_node_connections {
ConnectionDetails {
conn: get_mock_connection(name, 1),
ip: None,
az: None,
}
.into_future(),
None,
Expand Down
160 changes: 158 additions & 2 deletions glide-core/redis-rs/redis/tests/test_cluster_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ mod support;
mod test_cluster_scan_async {
use crate::support::*;
use rand::Rng;
use redis::cluster_routing::{MultipleNodeRoutingInfo, RoutingInfo, SingleNodeRoutingInfo};
use redis::cluster_routing::{
MultipleNodeRoutingInfo, ResponsePolicy, RoutingInfo, SingleNodeRoutingInfo,
};
use redis::{cmd, from_redis_value, ObjectType, RedisResult, ScanStateRC, Value};
use std::time::Duration;

Expand Down Expand Up @@ -905,7 +907,8 @@ mod test_cluster_scan_async {

#[tokio::test]
#[serial_test::serial]
async fn test_async_cluster_scan_with_missing_node() {
/// Test a case where a node is killed, key set into the cluster, and the client is still able to scan all keys
async fn test_async_cluster_scan_uncovered_slots_of_missing_node() {
// Create a cluster with 3 nodes
let cluster = TestClusterContext::new_with_cluster_client_builder(
3,
Expand Down Expand Up @@ -984,4 +987,157 @@ mod test_cluster_scan_async {
}
assert!(keys.len() > 0);
}

#[tokio::test]
#[serial_test::serial]
/// Test scanning after killing a node and compare with "KEYS *" from remaining nodes
async fn test_async_cluster_scan_after_node_killed() {
// Create a cluster with 3 nodes
let cluster = TestClusterContext::new_with_cluster_client_builder(
3,
0,
|builder| builder.retries(0),
false,
);
let mut connection = cluster.async_connection(None).await;

// Set cluster-require-full-coverage to no
let mut config_cmd = cmd("CONFIG");
config_cmd
.arg("SET")
.arg("cluster-require-full-coverage")
.arg("no");
let _: RedisResult<Value> = connection
.route_command(
&config_cmd,
RoutingInfo::MultiNode((MultipleNodeRoutingInfo::AllNodes, None)),
)
.await;

for i in 0..100 {
let key = format!("key{}", i);
let _res: RedisResult<()> = redis::cmd("SET")
.arg(&key)
.arg("value")
.query_async(&mut connection)
.await;
}

// Kill one node
let cluster_nodes = cluster.get_cluster_nodes().await;
let slot_distribution = cluster.get_slots_ranges_distribution(&cluster_nodes);
let killed_node_routing = kill_one_node(&cluster, slot_distribution.clone()).await;
let ready = cluster.wait_for_fail_to_finish(&killed_node_routing).await;
match ready {
Ok(_) => {}
Err(e) => {
println!("error: {:?}", e);
}
}

// Scan the keys
let mut scan_state_rc = ScanStateRC::new();
let mut keys: Vec<String> = vec![];
loop {
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
.cluster_scan(scan_state_rc, None, None, true)
.await
.unwrap();
scan_state_rc = next_cursor;
let mut scan_keys = scan_keys
.into_iter()
.map(|v| from_redis_value(&v).unwrap())
.collect::<Vec<String>>(); // Change the type of `keys` to `Vec<String>`
keys.append(&mut scan_keys);
if scan_state_rc.is_finished() {
break;
}
}

// Get keys from remaining nodes using "KEYS *"
let mut keys_from_keys_command: Vec<String> = Vec::new();
let key_res: RedisResult<Value> = connection
.route_command(
cmd("KEYS").arg("*"),
RoutingInfo::MultiNode((
MultipleNodeRoutingInfo::AllNodes,
Some(ResponsePolicy::CombineArrays),
)),
)
.await;
if let Ok(value) = key_res {
let values: Vec<Value> = from_redis_value(&value).unwrap();
keys_from_keys_command
.extend(values.into_iter().map(|v| from_redis_value(&v).unwrap()));
}

// Sort and dedup keys
keys.sort();
keys.dedup();
keys_from_keys_command.sort();
keys_from_keys_command.dedup();

// Check if scanned keys match keys from "KEYS *"
assert_eq!(keys, keys_from_keys_command);
}

#[tokio::test]
#[serial_test::serial]
/// Test scanning with allow_non_covered_slots as false after killing a node
async fn test_async_cluster_scan_uncovered_slots_fail() {
// Create a cluster with 3 nodes
let cluster = TestClusterContext::new_with_cluster_client_builder(
3,
0,
|builder| builder.retries(0),
false,
);
let mut connection = cluster.async_connection(None).await;

// Kill one node
let cluster_nodes = cluster.get_cluster_nodes().await;
let slot_distribution = cluster.get_slots_ranges_distribution(&cluster_nodes);
let killed_node_routing = kill_one_node(&cluster, slot_distribution.clone()).await;
let ready = cluster.wait_for_fail_to_finish(&killed_node_routing).await;
match ready {
Ok(_) => {}
Err(e) => {
println!("error: {:?}", e);
}
}

for i in 0..100 {
let key = format!("key{}", i);
let _res: RedisResult<()> = redis::cmd("SET")
.arg(&key)
.arg("value")
.query_async(&mut connection)
.await;
}

// Try scanning with allow_non_covered_slots as false
let mut scan_state_rc = ScanStateRC::new();
let mut had_error = false;
loop {
let result = connection
.cluster_scan(scan_state_rc.clone(), None, None, false)
.await;

match result {
Ok((next_cursor, _)) => {
scan_state_rc = next_cursor;
if scan_state_rc.is_finished() {
break;
}
}
Err(e) => {
had_error = true;
assert_eq!(e.kind(), redis::ErrorKind::NotAllSlotsCovered);
break;
}
}
}

assert!(had_error);
}
}
1 change: 0 additions & 1 deletion python/python/tests/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ async def is_cluster_ready(client: GlideClusterClient, count: int) -> bool:
while True:
if asyncio.get_event_loop().time() - start > timeout:
return False
print(asyncio.get_event_loop().time() - start)
nodes_raw = await client.custom_command(["CLUSTER", "NODES"])
node_bytes_raw = cast(bytes, nodes_raw)
parsed_nodes = [
Expand Down

0 comments on commit d2ac4f5

Please sign in to comment.