diff --git a/glide-core/redis-rs/redis/src/commands/cluster_scan.rs b/glide-core/redis-rs/redis/src/commands/cluster_scan.rs index 20b58eedd0..255ad809b9 100644 --- a/glide-core/redis-rs/redis/src/commands/cluster_scan.rs +++ b/glide-core/redis-rs/redis/src/commands/cluster_scan.rs @@ -582,7 +582,17 @@ where // Mark the current slot as scanned mark_slot_as_scanned(scanned_slots_map, slot); // Move to the next slot - slot = slot.saturating_add(1); + loop { + slot = slot.saturating_add(1); + if slot == END_OF_SCAN { + return Ok(NextNodeResult::AllSlotsCompleted); + } + let slot_index = (slot as u64 / BITS_PER_U64 as u64) as usize; + let slot_bit = slot as u64 % (BITS_PER_U64 as u64); + if scanned_slots_map[slot_index] & (1 << slot_bit) == 0 { + break; + } + } } None => { // Error if slots are not covered and scanning is not allowed diff --git a/glide-core/redis-rs/redis/tests/test_cluster_scan.rs b/glide-core/redis-rs/redis/tests/test_cluster_scan.rs index 320951367a..a45a3d61f0 100644 --- a/glide-core/redis-rs/redis/tests/test_cluster_scan.rs +++ b/glide-core/redis-rs/redis/tests/test_cluster_scan.rs @@ -97,6 +97,220 @@ mod test_cluster_scan_async { } } + #[tokio::test] + #[serial_test::serial] + async fn test_async_cluster_scan_with_allow_non_covered_slots() { + let cluster = TestClusterContext::new_with_cluster_client_builder( + 3, + 0, + |builder| builder.retries(1), + false, + ); + + let mut connection = cluster.async_connection(None).await; + let mut expected_keys: Vec = Vec::new(); + + for i in 0..1000 { + let key = format!("key{}", i); + let _: Result<(), redis::RedisError> = redis::cmd("SET") + .arg(&key) + .arg("value") + .query_async(&mut connection) + .await; + expected_keys.push(key); + } + + let mut scan_state_rc = ScanStateRC::new(); + let mut keys: Vec = Vec::new(); + loop { + let cluster_scan_args = ClusterScanArgs::builder() + .allow_non_covered_slots(true) + .build(); + let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection + .cluster_scan(scan_state_rc, cluster_scan_args) + .await + .unwrap(); + scan_state_rc = next_cursor; + let mut scan_keys = scan_keys + .into_iter() + .map(|v| from_redis_value(&v).unwrap()) + .collect::>(); + keys.append(&mut scan_keys); + if scan_state_rc.is_finished() { + break; + } + } + + keys.sort(); + expected_keys.sort(); + assert_eq!(keys, expected_keys); + } + + #[tokio::test] + #[serial_test::serial] + async fn test_async_cluster_scan_with_few_delslots() { + let cluster = TestClusterContext::new_with_cluster_client_builder( + 3, + 0, + |builder| builder.retries(1), + false, + ); + let mut connection = cluster.async_connection(None).await; + let mut expected_keys: Vec = Vec::new(); + + for i in 0..1000 { + let key = format!("key{}", i); + let _: Result<(), redis::RedisError> = redis::cmd("SET") + .arg(&key) + .arg("value") + .query_async(&mut connection) + .await; + expected_keys.push(key); + } + + let mut delslots_cmd = cmd("CLUSTER"); + // delete a few slots + delslots_cmd.arg("DELSLOTSRANGE").arg("0").arg("100"); + let _: RedisResult = connection + .route_command( + &delslots_cmd, + RoutingInfo::MultiNode(( + MultipleNodeRoutingInfo::AllNodes, + Some(ResponsePolicy::AllSucceeded), + )), + ) + .await; + + let mut slots_cmd = cmd("CLUSTER"); + slots_cmd.arg("SLOTS"); + let slots_info: Value = connection + .route_command( + &slots_cmd, + RoutingInfo::MultiNode(( + MultipleNodeRoutingInfo::AllNodes, + Some(ResponsePolicy::OneSucceeded), + )), + ) + .await + .unwrap(); + + if let Value::Array(slots) = slots_info { + println!("slots: {:?}", slots); + for slot_info in slots { + if let Value::Array(slot_details) = slot_info { + if let (Value::Int(start), Value::Int(end)) = + (slot_details[0].clone(), slot_details[1].clone()) + { + assert!(end < 0 || start > 100); + } + } + } + } + + let mut scan_state_rc = ScanStateRC::new(); + let mut keys: Vec = Vec::new(); + loop { + let cluster_scan_args = ClusterScanArgs::builder() + .allow_non_covered_slots(true) + .build(); + let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection + .cluster_scan(scan_state_rc, cluster_scan_args) + .await + .unwrap(); + scan_state_rc = next_cursor; + let mut scan_keys = scan_keys + .into_iter() + .map(|v| from_redis_value(&v).unwrap()) + .collect::>(); + keys.append(&mut scan_keys); + if scan_state_rc.is_finished() { + break; + } + } + + keys.sort(); + expected_keys.sort(); + assert_eq!(keys, expected_keys); + } + + #[tokio::test] + #[serial_test::serial] + async fn test_async_cluster_scan_with_all_delslots() { + let cluster = TestClusterContext::new_with_cluster_client_builder( + 3, + 0, + |builder| builder.retries(1), + false, + ); + let mut connection = cluster.async_connection(None).await; + let mut expected_keys: Vec = Vec::new(); + + for i in 0..1000 { + let key = format!("key{}", i); + let _: Result<(), redis::RedisError> = redis::cmd("SET") + .arg(&key) + .arg("value") + .query_async(&mut connection) + .await; + expected_keys.push(key); + } + + let mut delslots_cmd = cmd("CLUSTER"); + // delete all slots + delslots_cmd.arg("DELSLOTSRANGE").arg("0").arg("16383"); + let _: RedisResult = connection + .route_command( + &delslots_cmd, + RoutingInfo::MultiNode(( + MultipleNodeRoutingInfo::AllNodes, + Some(ResponsePolicy::AllSucceeded), + )), + ) + .await; + + let mut slots_cmd = cmd("CLUSTER"); + slots_cmd.arg("SLOTS"); + let slots_info: Value = connection + .route_command( + &slots_cmd, + RoutingInfo::MultiNode(( + MultipleNodeRoutingInfo::AllNodes, + Some(ResponsePolicy::OneSucceeded), + )), + ) + .await + .unwrap(); + + if let Value::Array(slots) = slots_info { + assert_eq!(slots.len(), 0); + } + + let mut scan_state_rc = ScanStateRC::new(); + let mut keys: Vec = Vec::new(); + loop { + let cluster_scan_args = ClusterScanArgs::builder() + .allow_non_covered_slots(true) + .build(); + let (next_cursor, scan_keys): (ScanStateRC, Vec) = connection + .cluster_scan(scan_state_rc, cluster_scan_args) + .await + .unwrap(); + scan_state_rc = next_cursor; + let mut scan_keys = scan_keys + .into_iter() + .map(|v| from_redis_value(&v).unwrap()) + .collect::>(); + keys.append(&mut scan_keys); + if scan_state_rc.is_finished() { + break; + } + } + + keys.sort(); + expected_keys.sort(); + assert_eq!(keys, expected_keys); + } + #[tokio::test] #[serial_test::serial] // test cluster scan with slot migration in the middle async fn test_async_cluster_scan_with_migration() { @@ -262,7 +476,7 @@ mod test_cluster_scan_async { .allow_non_covered_slots(true) .build(); loop { - let res = connection + let res: Result<(ScanStateRC, Vec), redis::RedisError> = connection .cluster_scan(scan_state_rc.clone(), args.clone()) .await; let (next_cursor, scan_keys): (ScanStateRC, Vec) = match res {