Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding retryable to scan #456

Merged
merged 12 commits into from
Aug 22, 2024
147 changes: 96 additions & 51 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,32 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use core::ops::Range;

use std::str::FromStr;
use std::sync::Arc;

use futures::StreamExt;
use log::debug;
use tokio::time::sleep;

use crate::backoff::DEFAULT_REGION_BACKOFF;
use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF};
use crate::common::Error;
use crate::config::Config;
use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::proto::kvrpcpb::{RawScanRequest, RawScanResponse};
use crate::proto::metapb;
use crate::raw::lowering::*;
use crate::request::Collect;
use crate::request::CollectSingle;
use crate::request::EncodeKeyspace;
use crate::request::KeyMode;
use crate::request::Keyspace;
use crate::request::Plan;
use crate::request::TruncateKeyspace;
use crate::request::{plan, Collect};
use crate::store::{HasRegionError, RegionStore};
use crate::Backoff;
use crate::BoundRange;
use crate::ColumnFamily;
use crate::Error::RegionError;
use crate::Key;
use crate::KvPair;
use crate::Result;
Expand Down Expand Up @@ -755,57 +758,37 @@ impl<PdC: PdClient> Client<PdC> {
max_limit: MAX_RAW_KV_SCAN_LIMIT,
});
}

let mut cur_range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw);
let backoff = DEFAULT_STORE_BACKOFF;
let mut range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw);
let mut result = Vec::new();
let mut scan_regions = self.rpc.clone().stores_for_range(cur_range.clone()).boxed();
let mut region_store =
scan_regions
.next()
.await
.ok_or(Error::RegionForRangeNotFound {
range: (cur_range.clone()),
})??;
let mut cur_limit = limit;

while cur_limit > 0 {
let request = new_raw_scan_request(
cur_range.clone(),
cur_limit,
let mut current_limit = limit;
let (start_key, end_key) = range.clone().into_keys();
let mut current_key: Key = start_key;

while current_limit > 0 {
let scan_args = ScanInnerArgs {
start_key: current_key.clone(),
end_key: end_key.clone(),
limit: current_limit,
key_only,
reverse,
self.cf.clone(),
);
let resp = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
.single_region_with_store(region_store.clone())
.await?
.plan()
.execute()
.await?;
let mut region_scan_res = resp
.kvs
.into_iter()
.map(Into::into)
.collect::<Vec<KvPair>>();
let res_len = region_scan_res.len();
result.append(&mut region_scan_res);

// if the number of results is less than cur_limit, it means this scan range contains more than one region, so we need to scan next region
if res_len < cur_limit as usize {
region_store = match scan_regions.next().await {
Some(Ok(rs)) => {
cur_range = BoundRange::new(
std::ops::Bound::Included(region_store.region_with_leader.range().1),
cur_range.to,
);
rs
}
Some(Err(e)) => return Err(e),
None => break,
};
cur_limit -= res_len as u32;
} else {
backoff: backoff.clone(),
};
let (res, next_key) = self.retryable_scan(scan_args).await?;

let mut kvs = res
.map(|r| r.kvs.into_iter().map(Into::into).collect::<Vec<KvPair>>())
.unwrap_or(Vec::new());

if !kvs.is_empty() {
current_limit -= kvs.len() as u32;
result.append(&mut kvs);
}
if end_key.clone().is_some_and(|ek| ek <= next_key) {
break;
} else {
current_key = next_key;
range = BoundRange::new(std::ops::Bound::Included(current_key.clone()), range.to);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
range = BoundRange::new(std::ops::Bound::Included(current_key.clone()), range.to);

}
}

Expand All @@ -818,6 +801,58 @@ impl<PdC: PdClient> Client<PdC> {
Ok(result)
}

async fn retryable_scan(
&self,
mut scan_args: ScanInnerArgs,
) -> Result<(Option<RawScanResponse>, Key)> {
let start_key = scan_args.start_key;
let end_key = scan_args.end_key;
loop {
let region = self.rpc.clone().region_for_key(&start_key).await?;
let store = self.rpc.clone().store_for_id(region.id()).await?;
let request = new_raw_scan_request(
(start_key.clone(), end_key.clone()).into(),
scan_args.limit,
scan_args.key_only,
scan_args.reverse,
self.cf.clone(),
);
let resp = self.do_store_scan(store.clone(), request.clone()).await;
return match resp {
Ok(mut r) => {
if let Some(err) = r.region_error() {
let status =
plan::handle_region_error(self.rpc.clone(), err.clone(), store.clone())
.await?;
if status {
continue;
} else if let Some(duration) = scan_args.backoff.next_delay_duration() {
sleep(duration).await;
continue;
} else {
return Err(RegionError(Box::new(err)));
}
}
Ok((Some(r), region.end_key()))
}
Err(err) => Err(err),
};
}
}

async fn do_store_scan(
&self,
store: RegionStore,
scan_request: RawScanRequest,
) -> Result<RawScanResponse> {
crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, scan_request)
.single_region_with_store(store.clone())
.await?
.plan()
.execute()
.await
}

async fn batch_scan_inner(
&self,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
Expand Down Expand Up @@ -864,6 +899,16 @@ impl<PdC: PdClient> Client<PdC> {
}
}

#[derive(Clone)]
struct ScanInnerArgs {
start_key: Key,
end_key: Option<Key>,
limit: u32,
key_only: bool,
reverse: bool,
backoff: Backoff,
}

#[cfg(test)]
mod tests {
use std::any::Any;
Expand Down
Loading
Loading