Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature dummy scan #8

Open
wants to merge 2 commits into
base: Feature-dummyScan
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion glide-core/redis-rs/redis/src/commands/cluster_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,17 @@ where
// Mark the current slot as scanned
mark_slot_as_scanned(scanned_slots_map, slot);
// Move to the next slot
slot = slot.saturating_add(1);
loop {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what I missed, but we have a next_slot function so you can use it, we double the logic here.

slot = slot.saturating_add(1);
if slot == END_OF_SCAN {
return Ok(NextNodeResult::AllSlotsCompleted);
}
let slot_index = (slot as u64 / BITS_PER_U64 as u64) as usize;
let slot_bit = slot as u64 % (BITS_PER_U64 as u64);
if scanned_slots_map[slot_index] & (1 << slot_bit) == 0 {
break;
}
}
}
None => {
// Error if slots are not covered and scanning is not allowed
Expand Down
216 changes: 215 additions & 1 deletion glide-core/redis-rs/redis/tests/test_cluster_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,220 @@ mod test_cluster_scan_async {
}
}

#[tokio::test]
#[serial_test::serial]
async fn test_async_cluster_scan_with_allow_non_covered_slots() {
let cluster = TestClusterContext::new_with_cluster_client_builder(
3,
0,
|builder| builder.retries(1),
false,
);

let mut connection = cluster.async_connection(None).await;
let mut expected_keys: Vec<String> = Vec::new();

for i in 0..1000 {
let key = format!("key{}", i);
let _: Result<(), redis::RedisError> = redis::cmd("SET")
.arg(&key)
.arg("value")
.query_async(&mut connection)
.await;
expected_keys.push(key);
}

let mut scan_state_rc = ScanStateRC::new();
let mut keys: Vec<String> = Vec::new();
loop {
let cluster_scan_args = ClusterScanArgs::builder()
.allow_non_covered_slots(true)
.build();
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
.cluster_scan(scan_state_rc, cluster_scan_args)
.await
.unwrap();
scan_state_rc = next_cursor;
let mut scan_keys = scan_keys
.into_iter()
.map(|v| from_redis_value(&v).unwrap())
.collect::<Vec<String>>();
keys.append(&mut scan_keys);
if scan_state_rc.is_finished() {
break;
}
}

keys.sort();
expected_keys.sort();
assert_eq!(keys, expected_keys);
}

#[tokio::test]
#[serial_test::serial]
async fn test_async_cluster_scan_with_few_delslots() {
let cluster = TestClusterContext::new_with_cluster_client_builder(
3,
0,
|builder| builder.retries(1),
false,
);
let mut connection = cluster.async_connection(None).await;
let mut expected_keys: Vec<String> = Vec::new();

for i in 0..1000 {
let key = format!("key{}", i);
let _: Result<(), redis::RedisError> = redis::cmd("SET")
.arg(&key)
.arg("value")
.query_async(&mut connection)
.await;
expected_keys.push(key);
}

let mut delslots_cmd = cmd("CLUSTER");
// delete a few slots
delslots_cmd.arg("DELSLOTSRANGE").arg("0").arg("100");
let _: RedisResult<Value> = connection
.route_command(
&delslots_cmd,
RoutingInfo::MultiNode((
MultipleNodeRoutingInfo::AllNodes,
Some(ResponsePolicy::AllSucceeded),
)),
)
.await;

let mut slots_cmd = cmd("CLUSTER");
slots_cmd.arg("SLOTS");
let slots_info: Value = connection
.route_command(
&slots_cmd,
RoutingInfo::MultiNode((
MultipleNodeRoutingInfo::AllNodes,
Some(ResponsePolicy::OneSucceeded),
)),
)
.await
.unwrap();

if let Value::Array(slots) = slots_info {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment what you're doing here.
I have get_slots_ranges_distribution, I'm not fully remember the logic, but I created it to compare the state of clusters before and after changes, I think you can use it.
Anyhow, I would take it out as a helper so it's more readable and can be used again.

println!("slots: {:?}", slots);
for slot_info in slots {
if let Value::Array(slot_details) = slot_info {
if let (Value::Int(start), Value::Int(end)) =
(slot_details[0].clone(), slot_details[1].clone())
{
assert!(end < 0 || start > 100);
}
}
}
}

let mut scan_state_rc = ScanStateRC::new();
let mut keys: Vec<String> = Vec::new();
loop {
let cluster_scan_args = ClusterScanArgs::builder()
.allow_non_covered_slots(true)
.build();
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
.cluster_scan(scan_state_rc, cluster_scan_args)
.await
.unwrap();
scan_state_rc = next_cursor;
let mut scan_keys = scan_keys
.into_iter()
.map(|v| from_redis_value(&v).unwrap())
.collect::<Vec<String>>();
keys.append(&mut scan_keys);
if scan_state_rc.is_finished() {
break;
}
}

keys.sort();
expected_keys.sort();
assert_eq!(keys, expected_keys);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's may or may not pass, deleting slots means deleting their keys, that's meaning the key you expect may not be there.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleting a slot simply removes the slot assignment from the node, but the key data existing on each node remains there. Therefore, if you access that node and perform a scan, the keys will still be present.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, so the hashing function change? It requires a dive into valkey code to understand, but this is very interesting.

}

#[tokio::test]
#[serial_test::serial]
async fn test_async_cluster_scan_with_all_delslots() {
let cluster = TestClusterContext::new_with_cluster_client_builder(
3,
0,
|builder| builder.retries(1),
false,
);
let mut connection = cluster.async_connection(None).await;
let mut expected_keys: Vec<String> = Vec::new();

for i in 0..1000 {
let key = format!("key{}", i);
let _: Result<(), redis::RedisError> = redis::cmd("SET")
.arg(&key)
.arg("value")
.query_async(&mut connection)
.await;
expected_keys.push(key);
}

let mut delslots_cmd = cmd("CLUSTER");
// delete all slots
delslots_cmd.arg("DELSLOTSRANGE").arg("0").arg("16383");
let _: RedisResult<Value> = connection
.route_command(
&delslots_cmd,
RoutingInfo::MultiNode((
MultipleNodeRoutingInfo::AllNodes,
Some(ResponsePolicy::AllSucceeded),
)),
)
.await;

let mut slots_cmd = cmd("CLUSTER");
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the comment above, utils function accept len can do the job for both cases.

slots_cmd.arg("SLOTS");
let slots_info: Value = connection
.route_command(
&slots_cmd,
RoutingInfo::MultiNode((
MultipleNodeRoutingInfo::AllNodes,
Some(ResponsePolicy::OneSucceeded),
)),
)
.await
.unwrap();

if let Value::Array(slots) = slots_info {
assert_eq!(slots.len(), 0);
}

let mut scan_state_rc = ScanStateRC::new();
let mut keys: Vec<String> = Vec::new();
loop {
let cluster_scan_args = ClusterScanArgs::builder()
.allow_non_covered_slots(true)
.build();
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
.cluster_scan(scan_state_rc, cluster_scan_args)
.await
.unwrap();
scan_state_rc = next_cursor;
let mut scan_keys = scan_keys
.into_iter()
.map(|v| from_redis_value(&v).unwrap())
.collect::<Vec<String>>();
keys.append(&mut scan_keys);
if scan_state_rc.is_finished() {
break;
}
}

keys.sort();
expected_keys.sort();
assert_eq!(keys, expected_keys);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you delete all the slots in it worked, the cluster should be empty if im not mistaking, so something is not working here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. When slots are deleted, something should stop working, but it seems that's not happening. Looking at the code, particularly in refresh_slots_inner, we need to ensure the cluster's slot map gets properly updated with the latest information when slots are removed. I'll investigate how we can fix this.

}

#[tokio::test]
#[serial_test::serial] // test cluster scan with slot migration in the middle
async fn test_async_cluster_scan_with_migration() {
Expand Down Expand Up @@ -262,7 +476,7 @@ mod test_cluster_scan_async {
.allow_non_covered_slots(true)
.build();
loop {
let res = connection
let res: Result<(ScanStateRC, Vec<Value>), redis::RedisError> = connection
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did it create an error? I didn't have any issues with it.

.cluster_scan(scan_state_rc.clone(), args.clone())
.await;
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = match res {
Expand Down
Loading