Skip to content

Commit

Permalink
Refactor cluster scan method completely
Browse files Browse the repository at this point in the history
Signed-off-by: avifenesh <[email protected]>
  • Loading branch information
avifenesh committed Nov 29, 2024
1 parent d2ac4f5 commit 03b62aa
Show file tree
Hide file tree
Showing 9 changed files with 690 additions and 738 deletions.
153 changes: 33 additions & 120 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,13 @@ pub mod testing {
use crate::{
client::GlideConnectionOptions,
cluster_routing::{Routable, RoutingInfo, ShardUpdateResult},
cluster_slotmap::SlotMap,
cluster_topology::{
calculate_topology, get_slot, SlotRefreshState, DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES,
DEFAULT_REFRESH_SLOTS_RETRY_BASE_DURATION_MILLIS, DEFAULT_REFRESH_SLOTS_RETRY_BASE_FACTOR,
SLOT_SIZE,
},
cmd,
commands::cluster_scan::{cluster_scan, ClusterScanArgs, ObjectType, ScanStateRC},
FromRedisValue, InfoDict, ToRedisArgs,
commands::cluster_scan::{cluster_scan, ClusterScanArgs, ScanStateRC},
FromRedisValue, InfoDict,
};
use dashmap::DashMap;
use std::{
Expand Down Expand Up @@ -142,73 +140,6 @@ where
})
}

/// Special handling for `SCAN` command, using `cluster_scan`.
/// If you wish to use a match pattern, use [`cluster_scan_with_pattern`].
/// Perform a `SCAN` command on a cluster, using scan state object in order to handle changes in topology
/// and make sure that all keys that were in the cluster from start to end of the scan are scanned.
/// In order to make sure all keys in the cluster scanned, topology refresh occurs more frequently and may affect performance.
///
/// # Arguments
///
/// * `scan_state_rc` - A reference to the scan state, For initiating new scan send [`ScanStateRC::new()`],
/// for each subsequent iteration use the returned [`ScanStateRC`].
/// * `count` - An optional count of keys requested,
/// the amount returned can vary and not obligated to return exactly count.
/// * `object_type` - An optional [`ObjectType`] enum of requested key data type.
/// * `allow_non_covered_slots` - A boolean flag to allow missing slots in the cluster.
///
/// # Returns
///
/// A [`ScanStateRC`] for the updated state of the scan and the vector of keys that were found in the scan.
/// structure of returned value:
/// `Ok((ScanStateRC, Vec<Value>))`
///
/// When the scan is finished [`ScanStateRC`] will be None, and can be checked by calling `scan_state_wrapper.is_finished()`.
///
/// # Example
/// ```rust,no_run
/// use redis::cluster::ClusterClient;
/// use redis::{ScanStateRC, FromRedisValue, from_redis_value, Value, ObjectType};
///
/// async fn scan_all_cluster() -> Vec<String> {
/// let nodes = vec!["redis://127.0.0.1/"];
/// let client = ClusterClient::new(nodes).unwrap();
/// let mut connection = client.get_async_connection(None).await.unwrap();
/// let mut scan_state_rc = ScanStateRC::new();
/// let mut keys: Vec<String> = vec![];
/// loop {
/// let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) =
/// connection.cluster_scan(scan_state_rc, None, None, false).await.unwrap();
/// scan_state_rc = next_cursor;
/// let mut scan_keys = scan_keys
/// .into_iter()
/// .map(|v| from_redis_value(&v).unwrap())
/// .collect::<Vec<String>>(); // Change the type of `keys` to `Vec<String>`
/// keys.append(&mut scan_keys);
/// if scan_state_rc.is_finished() {
/// break;
/// }
/// }
/// keys
/// }
/// ```
pub async fn cluster_scan(
&mut self,
scan_state_rc: ScanStateRC,
count: Option<usize>,
object_type: Option<ObjectType>,
allow_non_covered_slots: bool,
) -> RedisResult<(ScanStateRC, Vec<Value>)> {
let cluster_scan_args = ClusterScanArgs::new(
scan_state_rc,
None,
count,
object_type,
allow_non_covered_slots,
);
self.route_cluster_scan(cluster_scan_args).await
}

/// Special handling for `SCAN` command, using `cluster_scan_with_pattern`.
/// It is a special case of [`cluster_scan`], with an additional match pattern.
/// Perform a `SCAN` command on a cluster, using scan state object in order to handle changes in topology
Expand All @@ -219,11 +150,8 @@ where
///
/// * `scan_state_rc` - A reference to the scan state, For initiating new scan send [`ScanStateRC::new()`],
/// for each subsequent iteration use the returned [`ScanStateRC`].
/// * `match_pattern` - A match pattern of requested keys.
/// * `count` - An optional count of keys requested,
/// the amount returned can vary and not obligated to return exactly count.
/// * `object_type` - An optional [`ObjectType`] enum of requested key data type.
/// * `allow_non_covered_slots` - A boolean flag to allow missing slots in the cluster.
/// * `cluster_scan_args` - A [`ClusterScanArgs`] struct containing the arguments for the cluster scan command - match pattern, count,
/// object type and the allow_non_covered_slots flag.
///
/// # Returns
///
Expand All @@ -236,17 +164,18 @@ where
/// # Example
/// ```rust,no_run
/// use redis::cluster::ClusterClient;
/// use redis::{ScanStateRC, FromRedisValue, from_redis_value, Value, ObjectType};
/// use redis::{ScanStateRC, from_redis_value, Value, ObjectType, ClusterScanArgs};
///
/// async fn scan_all_cluster() -> Vec<String> {
/// let nodes = vec!["redis://127.0.0.1/"];
/// let client = ClusterClient::new(nodes).unwrap();
/// let mut connection = client.get_async_connection(None).await.unwrap();
/// let mut scan_state_rc = ScanStateRC::new();
/// let mut keys: Vec<String> = vec![];
/// let cluster_scan_args = ClusterScanArgs::builder().with_count(1000).with_object_type(ObjectType::String).build();
/// loop {
/// let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) =
/// connection.cluster_scan_with_pattern(scan_state_rc, b"my_key", None, None, false).await.unwrap();
/// connection.cluster_scan(scan_state_rc, cluster_scan_args.clone()).await.unwrap();
/// scan_state_rc = next_cursor;
/// let mut scan_keys = scan_keys
/// .into_iter()
Expand All @@ -260,21 +189,12 @@ where
/// keys
/// }
/// ```
pub async fn cluster_scan_with_pattern<K: ToRedisArgs>(
pub async fn cluster_scan(
&mut self,
scan_state_rc: ScanStateRC,
match_pattern: K,
count: Option<usize>,
object_type: Option<ObjectType>,
allow_non_covered_slots: bool,
mut cluster_scan_args: ClusterScanArgs,
) -> RedisResult<(ScanStateRC, Vec<Value>)> {
let cluster_scan_args = ClusterScanArgs::new(
scan_state_rc,
Some(match_pattern.to_redis_args().concat()),
count,
object_type,
allow_non_covered_slots,
);
cluster_scan_args.set_scan_state_cursor(scan_state_rc);
self.route_cluster_scan(cluster_scan_args).await
}

Expand All @@ -290,18 +210,18 @@ where
sender,
})
.await
.map_err(|_| {
.map_err(|e| {
RedisError::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"cluster: Unable to send command",
format!("Cluster: Error occurred while trying to send SCAN command to internal send task. {e:?}"),
))
})?;
receiver
.await
.unwrap_or_else(|_| {
.unwrap_or_else(|e| {
Err(RedisError::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"cluster: Unable to receive command",
format!("Cluster: Failed to receive SCAN command response from internal send task. {e:?}"),
)))
})
.map(|response| match response {
Expand All @@ -327,18 +247,20 @@ where
sender,
})
.await
.map_err(|_| {
.map_err(|e| {
RedisError::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"cluster: Unable to send command",
format!("Cluster: Error occurred while trying to send command to internal sender. {e:?}"),
))
})?;
receiver
.await
.unwrap_or_else(|_| {
.unwrap_or_else(|e| {
Err(RedisError::from(io::Error::new(
io::ErrorKind::BrokenPipe,
"cluster: Unable to receive command",
format!(
"Cluster: Failed to receive command response from internal sender. {e:?}"
),
)))
})
.map(|response| match response {
Expand Down Expand Up @@ -500,19 +422,6 @@ where
.map_err(|_| RedisError::from((ErrorKind::ClientError, MUTEX_WRITE_ERR)))
}

// return address of node for slot
pub(crate) async fn get_address_from_slot(
&self,
slot: u16,
slot_addr: SlotAddr,
) -> Option<Arc<String>> {
self.conn_lock
.read()
.expect(MUTEX_READ_ERR)
.slot_map
.get_node_address_for_slot(slot, slot_addr)
}

// return epoch of node
pub(crate) async fn get_address_epoch(&self, node_address: &str) -> Result<u64, RedisError> {
let command = cmd("CLUSTER").arg("INFO").to_owned();
Expand Down Expand Up @@ -552,14 +461,26 @@ where
}
}

// return slots of node
/// return slots of node
pub(crate) async fn get_slots_of_address(&self, node_address: Arc<String>) -> Vec<u16> {
self.conn_lock
.read()
.expect(MUTEX_READ_ERR)
.slot_map
.get_slots_of_node(node_address)
}

/// Get connection for address
pub(crate) async fn get_connection_for_address(
&self,
address: &str,
) -> Option<ConnectionFuture<C>> {
self.conn_lock
.read()
.expect(MUTEX_READ_ERR)
.connection_for_address(address)
.map(|(_, conn)| conn)
}
}

pub(crate) struct ClusterConnInner<C> {
Expand Down Expand Up @@ -1895,14 +1816,6 @@ where
Self::refresh_slots_inner(inner, curr_retry).await
}

pub(crate) fn check_if_all_slots_covered(slot_map: &SlotMap) -> bool {
let mut slots_covered = 0;
for (end, slots) in slot_map.slots.iter() {
slots_covered += end.saturating_sub(slots.start).saturating_add(1);
}
slots_covered == SLOT_SIZE
}

// Query a node to discover slot-> master mappings
async fn refresh_slots_inner(inner: Arc<InnerCore<C>>, curr_retry: usize) -> RedisResult<()> {
let num_of_nodes = inner.conn_lock.read().expect(MUTEX_READ_ERR).len();
Expand Down
Loading

0 comments on commit 03b62aa

Please sign in to comment.