From 6e529127241b7b43194eef06754b27b73f74b854 Mon Sep 17 00:00:00 2001 From: limbooverlambda Date: Mon, 17 Jun 2024 14:08:33 -0700 Subject: [PATCH 01/10] adding retryable to scan Signed-off-by: limbooverlambda --- src/raw/client.rs | 156 ++++++++++++++++++++++++------------ src/request/plan.rs | 189 +++++++++++++++++++++----------------------- 2 files changed, 198 insertions(+), 147 deletions(-) diff --git a/src/raw/client.rs b/src/raw/client.rs index 76d40b65..8c9171bb 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -1,29 +1,34 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. +use async_recursion::async_recursion; use core::ops::Range; use std::str::FromStr; use std::sync::Arc; -use futures::StreamExt; use log::debug; +use tokio::sync::Semaphore; +use tokio::time::sleep; -use crate::backoff::DEFAULT_REGION_BACKOFF; +use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF}; use crate::common::Error; use crate::config::Config; use crate::pd::PdClient; use crate::pd::PdRpcClient; +use crate::proto::kvrpcpb::{RawScanRequest, RawScanResponse}; use crate::proto::metapb; use crate::raw::lowering::*; -use crate::request::Collect; use crate::request::CollectSingle; use crate::request::EncodeKeyspace; use crate::request::KeyMode; use crate::request::Keyspace; use crate::request::Plan; use crate::request::TruncateKeyspace; +use crate::request::{plan, Collect}; +use crate::store::{HasRegionError, RegionStore}; use crate::Backoff; use crate::BoundRange; use crate::ColumnFamily; +use crate::Error::RegionError; use crate::Key; use crate::KvPair; use crate::Result; @@ -755,57 +760,42 @@ impl Client { max_limit: MAX_RAW_KV_SCAN_LIMIT, }); } - - let mut cur_range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw); + let backoff = DEFAULT_STORE_BACKOFF; + let permits = Arc::new(Semaphore::new(16)); + let range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw); let mut result = Vec::new(); - let mut scan_regions = self.rpc.clone().stores_for_range(cur_range.clone()).boxed(); - let mut region_store = - scan_regions - .next() - .await - .ok_or(Error::RegionForRangeNotFound { - range: (cur_range.clone()), - })??; - let mut cur_limit = limit; - - while cur_limit > 0 { - let request = new_raw_scan_request( - cur_range.clone(), - cur_limit, + let mut current_limit = limit; + let (start_key, end_key) = range.clone().into_keys(); + let mut current_key: Option = Some(start_key); + while current_limit > 0 { + let scan_args = ScanInnerArgs { + start_key: current_key.clone(), + range: range.clone(), + limit, key_only, reverse, - self.cf.clone(), - ); - let resp = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request) - .single_region_with_store(region_store.clone()) - .await? - .plan() - .execute() - .await?; - let mut region_scan_res = resp - .kvs - .into_iter() - .map(Into::into) - .collect::>(); - let res_len = region_scan_res.len(); - result.append(&mut region_scan_res); - - // if the number of results is less than cur_limit, it means this scan range contains more than one region, so we need to scan next region - if res_len < cur_limit as usize { - region_store = match scan_regions.next().await { - Some(Ok(rs)) => { - cur_range = BoundRange::new( - std::ops::Bound::Included(region_store.region_with_leader.range().1), - cur_range.to, - ); - rs - } - Some(Err(e)) => return Err(e), - None => break, - }; - cur_limit -= res_len as u32; - } else { + permits: permits.clone(), + backoff: backoff.clone(), + }; + let (res, next_key) = self.retryable_scan(scan_args).await?; + + let mut kvs = res + .map(|r| r.kvs.into_iter().map(Into::into).collect::>()) + .unwrap_or(Vec::new()); + + if !kvs.is_empty() { + current_limit -= kvs.len() as u32; + result.append(&mut kvs); + } + if end_key + .as_ref() + .map(|ek| ek <= next_key.as_ref() && !ek.is_empty()) + .unwrap_or(false) + || next_key.is_empty() + { break; + } else { + current_key = Some(next_key); } } @@ -818,6 +808,61 @@ impl Client { Ok(result) } + #[async_recursion] + async fn retryable_scan( + &self, + mut scan_args: ScanInnerArgs, + ) -> Result<(Option, Key)> { + let start_key = match scan_args.start_key { + None => return Ok((None, Key::EMPTY)), + Some(ref sk) => sk, + }; + let permit = scan_args.permits.acquire().await.unwrap(); + let region = self.rpc.clone().region_for_key(start_key).await?; + let store = self.rpc.clone().store_for_id(region.id()).await?; + let request = new_raw_scan_request( + scan_args.range.clone(), + scan_args.limit, + scan_args.key_only, + scan_args.reverse, + self.cf.clone(), + ); + let resp = self.do_store_scan(store.clone(), request).await; + drop(permit); + match resp { + Ok(mut r) => { + if let Some(err) = r.region_error() { + let status = + plan::handle_region_error(self.rpc.clone(), err.clone(), store.clone()) + .await?; + return if status { + self.retryable_scan(scan_args.clone()).await + } else if let Some(duration) = scan_args.backoff.next_delay_duration() { + sleep(duration).await; + self.retryable_scan(scan_args.clone()).await + } else { + Err(RegionError(Box::new(err))) + }; + } + Ok((Some(r), region.end_key())) + } + Err(err) => Err(err), + } + } + + async fn do_store_scan( + &self, + store: RegionStore, + scan_request: RawScanRequest, + ) -> Result { + crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, scan_request) + .single_region_with_store(store.clone()) + .await? + .plan() + .execute() + .await + } + async fn batch_scan_inner( &self, ranges: impl IntoIterator>, @@ -864,6 +909,17 @@ impl Client { } } +#[derive(Clone)] +struct ScanInnerArgs { + start_key: Option, + range: BoundRange, + limit: u32, + key_only: bool, + reverse: bool, + permits: Arc, + backoff: Backoff, +} + #[cfg(test)] mod tests { use std::any::Any; diff --git a/src/request/plan.rs b/src/request/plan.rs index 369a2ff1..ffff6c24 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -187,7 +187,7 @@ where match backoff.next_delay_duration() { Some(duration) => { let region_error_resolved = - Self::handle_region_error(pd_client.clone(), e, region_store).await?; + handle_region_error(pd_client.clone(), e, region_store).await?; // don't sleep if we have resolved the region error if !region_error_resolved { sleep(duration).await; @@ -208,102 +208,6 @@ where } } - // Returns - // 1. Ok(true): error has been resolved, retry immediately - // 2. Ok(false): backoff, and then retry - // 3. Err(Error): can't be resolved, return the error to upper level - async fn handle_region_error( - pd_client: Arc, - e: errorpb::Error, - region_store: RegionStore, - ) -> Result { - let ver_id = region_store.region_with_leader.ver_id(); - if let Some(not_leader) = e.not_leader { - if let Some(leader) = not_leader.leader { - match pd_client - .update_leader(region_store.region_with_leader.ver_id(), leader) - .await - { - Ok(_) => Ok(true), - Err(e) => { - pd_client.invalidate_region_cache(ver_id).await; - Err(e) - } - } - } else { - // The peer doesn't know who is the current leader. Generally it's because - // the Raft group is in an election, but it's possible that the peer is - // isolated and removed from the Raft group. So it's necessary to reload - // the region from PD. - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } - } else if e.store_not_match.is_some() { - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } else if e.epoch_not_match.is_some() { - Self::on_region_epoch_not_match( - pd_client.clone(), - region_store, - e.epoch_not_match.unwrap(), - ) - .await - } else if e.stale_command.is_some() || e.region_not_found.is_some() { - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } else if e.server_is_busy.is_some() - || e.raft_entry_too_large.is_some() - || e.max_timestamp_not_synced.is_some() - { - Err(Error::RegionError(Box::new(e))) - } else { - // TODO: pass the logger around - // info!("unknwon region error: {:?}", e); - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } - } - - // Returns - // 1. Ok(true): error has been resolved, retry immediately - // 2. Ok(false): backoff, and then retry - // 3. Err(Error): can't be resolved, return the error to upper level - async fn on_region_epoch_not_match( - pd_client: Arc, - region_store: RegionStore, - error: EpochNotMatch, - ) -> Result { - let ver_id = region_store.region_with_leader.ver_id(); - if error.current_regions.is_empty() { - pd_client.invalidate_region_cache(ver_id).await; - return Ok(true); - } - - for r in error.current_regions { - if r.id == region_store.region_with_leader.id() { - let region_epoch = r.region_epoch.unwrap(); - let returned_conf_ver = region_epoch.conf_ver; - let returned_version = region_epoch.version; - let current_region_epoch = region_store - .region_with_leader - .region - .region_epoch - .clone() - .unwrap(); - let current_conf_ver = current_region_epoch.conf_ver; - let current_version = current_region_epoch.version; - - // Find whether the current region is ahead of TiKV's. If so, backoff. - if returned_conf_ver < current_conf_ver || returned_version < current_version { - return Ok(false); - } - } - } - // TODO: finer grained processing - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } - async fn handle_grpc_error( pd_client: Arc, plan: P, @@ -333,6 +237,97 @@ where } } +// Returns +// 1. Ok(true): error has been resolved, retry immediately +// 2. Ok(false): backoff, and then retry +// 3. Err(Error): can't be resolved, return the error to upper level +pub(crate) async fn handle_region_error( + pd_client: Arc, + e: errorpb::Error, + region_store: RegionStore, +) -> Result { + let ver_id = region_store.region_with_leader.ver_id(); + if let Some(not_leader) = e.not_leader { + if let Some(leader) = not_leader.leader { + match pd_client + .update_leader(region_store.region_with_leader.ver_id(), leader) + .await + { + Ok(_) => Ok(true), + Err(e) => { + pd_client.invalidate_region_cache(ver_id).await; + Err(e) + } + } + } else { + // The peer doesn't know who is the current leader. Generally it's because + // the Raft group is in an election, but it's possible that the peer is + // isolated and removed from the Raft group. So it's necessary to reload + // the region from PD. + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } + } else if e.store_not_match.is_some() { + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } else if e.epoch_not_match.is_some() { + on_region_epoch_not_match(pd_client.clone(), region_store, e.epoch_not_match.unwrap()).await + } else if e.stale_command.is_some() || e.region_not_found.is_some() { + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } else if e.server_is_busy.is_some() + || e.raft_entry_too_large.is_some() + || e.max_timestamp_not_synced.is_some() + { + Err(Error::RegionError(Box::new(e))) + } else { + // TODO: pass the logger around + // info!("unknwon region error: {:?}", e); + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } +} + +// Returns +// 1. Ok(true): error has been resolved, retry immediately +// 2. Ok(false): backoff, and then retry +// 3. Err(Error): can't be resolved, return the error to upper level +pub(crate) async fn on_region_epoch_not_match( + pd_client: Arc, + region_store: RegionStore, + error: EpochNotMatch, +) -> Result { + let ver_id = region_store.region_with_leader.ver_id(); + if error.current_regions.is_empty() { + pd_client.invalidate_region_cache(ver_id).await; + return Ok(true); + } + + for r in error.current_regions { + if r.id == region_store.region_with_leader.id() { + let region_epoch = r.region_epoch.unwrap(); + let returned_conf_ver = region_epoch.conf_ver; + let returned_version = region_epoch.version; + let current_region_epoch = region_store + .region_with_leader + .region + .region_epoch + .clone() + .unwrap(); + let current_conf_ver = current_region_epoch.conf_ver; + let current_version = current_region_epoch.version; + + // Find whether the current region is ahead of TiKV's. If so, backoff. + if returned_conf_ver < current_conf_ver || returned_version < current_version { + return Ok(false); + } + } + } + // TODO: finer grained processing + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) +} + impl Clone for RetryableMultiRegion { fn clone(&self) -> Self { RetryableMultiRegion { From da1785ed7f50a1951722041eea6a6e42d226a75e Mon Sep 17 00:00:00 2001 From: limbooverlambda Date: Tue, 25 Jun 2024 13:13:30 -0700 Subject: [PATCH 02/10] PR feedback along with a unit test Signed-off-by: limbooverlambda --- src/raw/client.rs | 146 +++++++++++++++++++++++++++++++++------------- 1 file changed, 106 insertions(+), 40 deletions(-) diff --git a/src/raw/client.rs b/src/raw/client.rs index 8c9171bb..c97ebda2 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -1,12 +1,11 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. - -use async_recursion::async_recursion; use core::ops::Range; +use std::future::Future; +use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; use log::debug; -use tokio::sync::Semaphore; use tokio::time::sleep; use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF}; @@ -15,7 +14,7 @@ use crate::config::Config; use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::kvrpcpb::{RawScanRequest, RawScanResponse}; -use crate::proto::metapb; +use crate::proto::{errorpb, metapb}; use crate::raw::lowering::*; use crate::request::CollectSingle; use crate::request::EncodeKeyspace; @@ -761,12 +760,16 @@ impl Client { }); } let backoff = DEFAULT_STORE_BACKOFF; - let permits = Arc::new(Semaphore::new(16)); let range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw); let mut result = Vec::new(); let mut current_limit = limit; let (start_key, end_key) = range.clone().into_keys(); - let mut current_key: Option = Some(start_key); + let mut current_key: Key = start_key; + + let region_error_handler = + |pd_rpc_client: Arc, err: errorpb::Error, store: RegionStore| { + Box::pin(plan::handle_region_error(pd_rpc_client, err, store)) + } as _; while current_limit > 0 { let scan_args = ScanInnerArgs { start_key: current_key.clone(), @@ -774,10 +777,9 @@ impl Client { limit, key_only, reverse, - permits: permits.clone(), backoff: backoff.clone(), }; - let (res, next_key) = self.retryable_scan(scan_args).await?; + let (res, next_key) = self.retryable_scan(scan_args, region_error_handler).await?; let mut kvs = res .map(|r| r.kvs.into_iter().map(Into::into).collect::>()) @@ -789,13 +791,13 @@ impl Client { } if end_key .as_ref() - .map(|ek| ek <= next_key.as_ref() && !ek.is_empty()) + .map(|ek| ek <= next_key.as_ref()) .unwrap_or(false) || next_key.is_empty() { break; } else { - current_key = Some(next_key); + current_key = next_key; } } @@ -808,17 +810,21 @@ impl Client { Ok(result) } - #[async_recursion] - async fn retryable_scan( + async fn retryable_scan<'a, F>( &self, mut scan_args: ScanInnerArgs, - ) -> Result<(Option, Key)> { - let start_key = match scan_args.start_key { - None => return Ok((None, Key::EMPTY)), - Some(ref sk) => sk, - }; - let permit = scan_args.permits.acquire().await.unwrap(); - let region = self.rpc.clone().region_for_key(start_key).await?; + mut error_handler: F, + ) -> Result<(Option, Key)> + where + F: FnMut( + Arc, + errorpb::Error, + RegionStore, + ) -> Pin>>>, + { + let start_key = scan_args.start_key; + + let region = self.rpc.clone().region_for_key(&start_key).await?; let store = self.rpc.clone().store_for_id(region.id()).await?; let request = new_raw_scan_request( scan_args.range.clone(), @@ -827,26 +833,26 @@ impl Client { scan_args.reverse, self.cf.clone(), ); - let resp = self.do_store_scan(store.clone(), request).await; - drop(permit); - match resp { - Ok(mut r) => { - if let Some(err) = r.region_error() { - let status = - plan::handle_region_error(self.rpc.clone(), err.clone(), store.clone()) - .await?; - return if status { - self.retryable_scan(scan_args.clone()).await - } else if let Some(duration) = scan_args.backoff.next_delay_duration() { - sleep(duration).await; - self.retryable_scan(scan_args.clone()).await - } else { - Err(RegionError(Box::new(err))) - }; + loop { + let resp = self.do_store_scan(store.clone(), request.clone()).await; + return match resp { + Ok(mut r) => { + if let Some(err) = r.region_error() { + let status = + error_handler(self.rpc.clone(), err.clone(), store.clone()).await?; + if status { + continue; + } else if let Some(duration) = scan_args.backoff.next_delay_duration() { + sleep(duration).await; + continue; + } else { + return Err(RegionError(Box::new(err))); + } + } + Ok((Some(r), region.end_key())) } - Ok((Some(r), region.end_key())) - } - Err(err) => Err(err), + Err(err) => Err(err), + }; } } @@ -911,23 +917,24 @@ impl Client { #[derive(Clone)] struct ScanInnerArgs { - start_key: Option, + start_key: Key, range: BoundRange, limit: u32, key_only: bool, reverse: bool, - permits: Arc, backoff: Backoff, } #[cfg(test)] mod tests { use std::any::Any; + use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use super::*; use crate::mock::MockKvClient; use crate::mock::MockPdClient; + use crate::proto::errorpb::EpochNotMatch; use crate::proto::kvrpcpb; use crate::Result; @@ -1009,4 +1016,63 @@ mod tests { ); Ok(()) } + + #[tokio::test] + async fn test_raw_scan_retryer() -> Result<()> { + let epoch_not_match_error = EpochNotMatch { + current_regions: vec![], + }; + let region_error = errorpb::Error { + epoch_not_match: Some(epoch_not_match_error), + ..Default::default() + }; + let flag = Arc::new(AtomicBool::new(true)); + let tikv_flag = flag.clone(); + let error_handler_flag = flag.clone(); + let mock_tikv_client = MockKvClient::with_dispatch_hook(move |req: &dyn Any| { + if let Some(_) = req.downcast_ref::() { + let v = tikv_flag.clone().load(Ordering::Relaxed); + let resp = if v { + RawScanResponse { + region_error: Some(region_error.clone()), + ..Default::default() + } + } else { + RawScanResponse { + ..Default::default() + } + }; + Ok(Box::new(resp) as Box) + } else { + unreachable!() + } + }); + let mock_pd_client = MockPdClient::new(mock_tikv_client); + let client = Client { + rpc: Arc::new(mock_pd_client), + cf: Some(ColumnFamily::Default), + backoff: DEFAULT_REGION_BACKOFF, + atomic: false, + keyspace: Keyspace::Enable { keyspace_id: 0 }, + }; + + let scan_args = ScanInnerArgs { + start_key: "k1".to_string().into(), + range: BoundRange::from("k1".to_owned().."k2".to_owned()), + limit: 10, + key_only: false, + reverse: false, + backoff: Backoff::no_backoff(), + }; + let error_handler = |_, _, _| { + error_handler_flag.clone().store(false, Ordering::Relaxed); + Box::pin(async move { + let res: Result = Ok(true); + res + }) + } as _; + client.retryable_scan(scan_args, error_handler).await?; + assert!(!error_handler_flag.load(Ordering::Relaxed)); + Ok(()) + } } From dcb6b894e6d7fd7ec15e962bf1f4a31f937bd500 Mon Sep 17 00:00:00 2001 From: limbooverlambda Date: Wed, 26 Jun 2024 00:33:01 -0700 Subject: [PATCH 03/10] make check fixes Signed-off-by: limbooverlambda --- src/raw/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/raw/client.rs b/src/raw/client.rs index c97ebda2..5afb213b 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -1030,7 +1030,7 @@ mod tests { let tikv_flag = flag.clone(); let error_handler_flag = flag.clone(); let mock_tikv_client = MockKvClient::with_dispatch_hook(move |req: &dyn Any| { - if let Some(_) = req.downcast_ref::() { + if req.downcast_ref::().is_some() { let v = tikv_flag.clone().load(Ordering::Relaxed); let resp = if v { RawScanResponse { From 181cde120a6ed1cb3d751f8974f7e4104aa800ad Mon Sep 17 00:00:00 2001 From: limbooverlambda Date: Mon, 8 Jul 2024 13:12:50 -0700 Subject: [PATCH 04/10] address PR feedback Signed-off-by: limbooverlambda --- src/raw/client.rs | 70 ++--------------------------------------------- 1 file changed, 2 insertions(+), 68 deletions(-) diff --git a/src/raw/client.rs b/src/raw/client.rs index 5afb213b..a466079e 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -774,7 +774,7 @@ impl Client { let scan_args = ScanInnerArgs { start_key: current_key.clone(), range: range.clone(), - limit, + limit: current_limit, key_only, reverse, backoff: backoff.clone(), @@ -789,12 +789,7 @@ impl Client { current_limit -= kvs.len() as u32; result.append(&mut kvs); } - if end_key - .as_ref() - .map(|ek| ek <= next_key.as_ref()) - .unwrap_or(false) - || next_key.is_empty() - { + if end_key.clone().is_some_and(|ek| ek <= next_key) { break; } else { current_key = next_key; @@ -928,13 +923,11 @@ struct ScanInnerArgs { #[cfg(test)] mod tests { use std::any::Any; - use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use super::*; use crate::mock::MockKvClient; use crate::mock::MockPdClient; - use crate::proto::errorpb::EpochNotMatch; use crate::proto::kvrpcpb; use crate::Result; @@ -1016,63 +1009,4 @@ mod tests { ); Ok(()) } - - #[tokio::test] - async fn test_raw_scan_retryer() -> Result<()> { - let epoch_not_match_error = EpochNotMatch { - current_regions: vec![], - }; - let region_error = errorpb::Error { - epoch_not_match: Some(epoch_not_match_error), - ..Default::default() - }; - let flag = Arc::new(AtomicBool::new(true)); - let tikv_flag = flag.clone(); - let error_handler_flag = flag.clone(); - let mock_tikv_client = MockKvClient::with_dispatch_hook(move |req: &dyn Any| { - if req.downcast_ref::().is_some() { - let v = tikv_flag.clone().load(Ordering::Relaxed); - let resp = if v { - RawScanResponse { - region_error: Some(region_error.clone()), - ..Default::default() - } - } else { - RawScanResponse { - ..Default::default() - } - }; - Ok(Box::new(resp) as Box) - } else { - unreachable!() - } - }); - let mock_pd_client = MockPdClient::new(mock_tikv_client); - let client = Client { - rpc: Arc::new(mock_pd_client), - cf: Some(ColumnFamily::Default), - backoff: DEFAULT_REGION_BACKOFF, - atomic: false, - keyspace: Keyspace::Enable { keyspace_id: 0 }, - }; - - let scan_args = ScanInnerArgs { - start_key: "k1".to_string().into(), - range: BoundRange::from("k1".to_owned().."k2".to_owned()), - limit: 10, - key_only: false, - reverse: false, - backoff: Backoff::no_backoff(), - }; - let error_handler = |_, _, _| { - error_handler_flag.clone().store(false, Ordering::Relaxed); - Box::pin(async move { - let res: Result = Ok(true); - res - }) - } as _; - client.retryable_scan(scan_args, error_handler).await?; - assert!(!error_handler_flag.load(Ordering::Relaxed)); - Ok(()) - } } From 3ee7efffda54a7f55a4dc2a3c2b7e566a13204a9 Mon Sep 17 00:00:00 2001 From: limbooverlambda Date: Mon, 5 Aug 2024 16:28:05 -0700 Subject: [PATCH 05/10] removing the lambda to simplify the retry logic Signed-off-by: limbooverlambda --- src/raw/client.rs | 26 +++++++------------------- 1 file changed, 7 insertions(+), 19 deletions(-) diff --git a/src/raw/client.rs b/src/raw/client.rs index a466079e..d535cf5c 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -1,7 +1,6 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use core::ops::Range; -use std::future::Future; -use std::pin::Pin; + use std::str::FromStr; use std::sync::Arc; @@ -14,7 +13,7 @@ use crate::config::Config; use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::kvrpcpb::{RawScanRequest, RawScanResponse}; -use crate::proto::{errorpb, metapb}; +use crate::proto::metapb; use crate::raw::lowering::*; use crate::request::CollectSingle; use crate::request::EncodeKeyspace; @@ -766,10 +765,6 @@ impl Client { let (start_key, end_key) = range.clone().into_keys(); let mut current_key: Key = start_key; - let region_error_handler = - |pd_rpc_client: Arc, err: errorpb::Error, store: RegionStore| { - Box::pin(plan::handle_region_error(pd_rpc_client, err, store)) - } as _; while current_limit > 0 { let scan_args = ScanInnerArgs { start_key: current_key.clone(), @@ -779,7 +774,7 @@ impl Client { reverse, backoff: backoff.clone(), }; - let (res, next_key) = self.retryable_scan(scan_args, region_error_handler).await?; + let (res, next_key) = self.retryable_scan(scan_args).await?; let mut kvs = res .map(|r| r.kvs.into_iter().map(Into::into).collect::>()) @@ -805,18 +800,10 @@ impl Client { Ok(result) } - async fn retryable_scan<'a, F>( + async fn retryable_scan( &self, mut scan_args: ScanInnerArgs, - mut error_handler: F, - ) -> Result<(Option, Key)> - where - F: FnMut( - Arc, - errorpb::Error, - RegionStore, - ) -> Pin>>>, - { + ) -> Result<(Option, Key)> { let start_key = scan_args.start_key; let region = self.rpc.clone().region_for_key(&start_key).await?; @@ -834,7 +821,8 @@ impl Client { Ok(mut r) => { if let Some(err) = r.region_error() { let status = - error_handler(self.rpc.clone(), err.clone(), store.clone()).await?; + plan::handle_region_error(self.rpc.clone(), err.clone(), store.clone()) + .await?; if status { continue; } else if let Some(duration) = scan_args.backoff.next_delay_duration() { From 70a3771090496de655c40854eba1b25e1d217d7a Mon Sep 17 00:00:00 2001 From: limbooverlambda Date: Thu, 8 Aug 2024 13:55:37 -0700 Subject: [PATCH 06/10] adding protobuf-code feature to cargo.toml Signed-off-by: limbooverlambda --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index a61b7992..6e8cd769 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ prometheus = ["prometheus/push", "prometheus/process"] # Enable integration tests with a running TiKV and PD instance. # Use $PD_ADDRS, comma separated, to set the addresses the tests use. integration-tests = [] +protobuf-codec = [] [lib] name = "tikv_client" From 8bdd1d51353d0866fa83ba8dac616d9461b8bde9 Mon Sep 17 00:00:00 2001 From: limbooverlambda Date: Thu, 8 Aug 2024 16:52:51 -0700 Subject: [PATCH 07/10] added clippy fix for the formatting issues with the latest stable Signed-off-by: limbooverlambda --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index aef0ad45..2a52fab3 100644 --- a/Makefile +++ b/Makefile @@ -17,7 +17,7 @@ generate: check: generate cargo check --all --all-targets --features "${ALL_FEATURES}" cargo fmt -- --check - cargo clippy --all-targets --features "${ALL_FEATURES}" -- -D clippy::all + cargo clippy --fix --allow-dirty --all-targets --features "${ALL_FEATURES}" -- -D clippy::all unit-test: generate cargo nextest run --all --no-default-features From 6b40932b1c96a05d25e6d378035c14eac9a0d7b3 Mon Sep 17 00:00:00 2001 From: limbooverlambda Date: Sat, 10 Aug 2024 16:02:07 -0700 Subject: [PATCH 08/10] moving the region and store lookup inside the loop and removing the clippy fix from Makefile Signed-off-by: limbooverlambda --- Makefile | 2 +- src/proto.rs | 1 + src/raw/client.rs | 18 +++++++++--------- src/transaction/client.rs | 2 +- src/transaction/requests.rs | 1 - 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/Makefile b/Makefile index 2a52fab3..aef0ad45 100644 --- a/Makefile +++ b/Makefile @@ -17,7 +17,7 @@ generate: check: generate cargo check --all --all-targets --features "${ALL_FEATURES}" cargo fmt -- --check - cargo clippy --fix --allow-dirty --all-targets --features "${ALL_FEATURES}" -- -D clippy::all + cargo clippy --all-targets --features "${ALL_FEATURES}" -- -D clippy::all unit-test: generate cargo nextest run --all --no-default-features diff --git a/src/proto.rs b/src/proto.rs index 30f699f9..c86a819a 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -5,6 +5,7 @@ pub use protos::*; +#[allow(clippy::doc_lazy_continuation)] mod protos { include!("generated/mod.rs"); } diff --git a/src/raw/client.rs b/src/raw/client.rs index d535cf5c..e5a996ab 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -806,16 +806,16 @@ impl Client { ) -> Result<(Option, Key)> { let start_key = scan_args.start_key; - let region = self.rpc.clone().region_for_key(&start_key).await?; - let store = self.rpc.clone().store_for_id(region.id()).await?; - let request = new_raw_scan_request( - scan_args.range.clone(), - scan_args.limit, - scan_args.key_only, - scan_args.reverse, - self.cf.clone(), - ); loop { + let region = self.rpc.clone().region_for_key(&start_key).await?; + let store = self.rpc.clone().store_for_id(region.id()).await?; + let request = new_raw_scan_request( + scan_args.range.clone(), + scan_args.limit, + scan_args.key_only, + scan_args.reverse, + self.cf.clone(), + ); let resp = self.do_store_scan(store.clone(), request.clone()).await; return match resp { Ok(mut r) => { diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 16d5c4f6..f3ea884c 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -42,7 +42,7 @@ const SCAN_LOCK_BATCH_SIZE: u32 = 1024; /// - `gc`: trigger a GC process which clears stale data in the cluster. /// - `current_timestamp`: get the current `Timestamp` from PD. /// - `snapshot`: get a [`Snapshot`] of the database at a specified timestamp. -/// A `Snapshot` is a read-only transaction. +/// A `Snapshot` is a read-only transaction. /// /// The returned results of transactional requests are [`Future`](std::future::Future)s that must be /// awaited to execute. diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 231c9e5a..6a5538d9 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -904,7 +904,6 @@ impl Merge for Collect { } #[cfg(test)] -#[cfg_attr(feature = "protobuf-codec", allow(clippy::useless_conversion))] mod tests { use crate::common::Error::PessimisticLockError; use crate::common::Error::ResolveLockError; From a7b4b74e0a654a520ae7c20a8d2208a4e86c2d57 Mon Sep 17 00:00:00 2001 From: limbooverlambda Date: Mon, 12 Aug 2024 12:19:26 -0700 Subject: [PATCH 09/10] added a fix for the bug with the range Signed-off-by: limbooverlambda --- Cargo.toml | 1 - src/raw/client.rs | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6e8cd769..a61b7992 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,6 @@ prometheus = ["prometheus/push", "prometheus/process"] # Enable integration tests with a running TiKV and PD instance. # Use $PD_ADDRS, comma separated, to set the addresses the tests use. integration-tests = [] -protobuf-codec = [] [lib] name = "tikv_client" diff --git a/src/raw/client.rs b/src/raw/client.rs index e5a996ab..a324b68b 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -759,7 +759,7 @@ impl Client { }); } let backoff = DEFAULT_STORE_BACKOFF; - let range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw); + let mut range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw); let mut result = Vec::new(); let mut current_limit = limit; let (start_key, end_key) = range.clone().into_keys(); @@ -788,6 +788,7 @@ impl Client { break; } else { current_key = next_key; + range = BoundRange::new(std::ops::Bound::Included(current_key.clone()), range.to); } } @@ -805,7 +806,6 @@ impl Client { mut scan_args: ScanInnerArgs, ) -> Result<(Option, Key)> { let start_key = scan_args.start_key; - loop { let region = self.rpc.clone().region_for_key(&start_key).await?; let store = self.rpc.clone().store_for_id(region.id()).await?; From 0a816f673f781c90396df8403084966a10a7d35c Mon Sep 17 00:00:00 2001 From: limbooverlambda Date: Mon, 19 Aug 2024 15:31:42 -0700 Subject: [PATCH 10/10] removing the range attribute from ScanInnerArgs and replacing it with end_key Signed-off-by: limbooverlambda --- src/raw/client.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/raw/client.rs b/src/raw/client.rs index a324b68b..620531ee 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -768,7 +768,7 @@ impl Client { while current_limit > 0 { let scan_args = ScanInnerArgs { start_key: current_key.clone(), - range: range.clone(), + end_key: end_key.clone(), limit: current_limit, key_only, reverse, @@ -806,11 +806,12 @@ impl Client { mut scan_args: ScanInnerArgs, ) -> Result<(Option, Key)> { let start_key = scan_args.start_key; + let end_key = scan_args.end_key; loop { let region = self.rpc.clone().region_for_key(&start_key).await?; let store = self.rpc.clone().store_for_id(region.id()).await?; let request = new_raw_scan_request( - scan_args.range.clone(), + (start_key.clone(), end_key.clone()).into(), scan_args.limit, scan_args.key_only, scan_args.reverse, @@ -901,7 +902,7 @@ impl Client { #[derive(Clone)] struct ScanInnerArgs { start_key: Key, - range: BoundRange, + end_key: Option, limit: u32, key_only: bool, reverse: bool,