Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
andylokandy committed Dec 16, 2023
1 parent 8534111 commit 8e2e187
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 63 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ doc:
cargo doc --workspace --exclude tikv-client-proto --document-private-items --no-deps

tiup:
tiup playground nightly --mode tikv-slim --kv 1 --without-monitor --kv.config ./config/tikv.toml --pd.config ./config/pd.toml --kv.binpath ../tikv/target/debug/tikv-server &
tiup playground nightly --mode tikv-slim --kv 3 --without-monitor --kv.config $(shell pwd)/config/tikv.toml --pd.config $(shell pwd)/config/pd.toml &

all: generate check doc test

Expand Down
5 changes: 0 additions & 5 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ impl<PdC: PdClient> Client<PdC> {
let request = new_raw_get_request(key, self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
.retry_multi_region(self.backoff.clone())
.extract_error()
.merge(CollectSingle)
.post_process_default()
.plan();
Expand Down Expand Up @@ -269,7 +268,6 @@ impl<PdC: PdClient> Client<PdC> {
let request = new_raw_batch_get_request(keys, self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
.retry_multi_region(self.backoff.clone())
.extract_error()
.merge(Collect)
.plan();
plan.execute().await.map(|r| {
Expand Down Expand Up @@ -583,7 +581,6 @@ impl<PdC: PdClient> Client<PdC> {
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, req)
.retry_multi_region(self.backoff.clone())
.extract_error()
.merge(CollectSingle)
.post_process_default()
.plan();
Expand Down Expand Up @@ -663,7 +660,6 @@ impl<PdC: PdClient> Client<PdC> {
crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
.single_region_with_store(region_store.clone())
.await?
.extract_error()
.plan()
.execute()
.await?;
Expand Down Expand Up @@ -723,7 +719,6 @@ impl<PdC: PdClient> Client<PdC> {
let request = new_raw_batch_scan_request(ranges, each_limit, key_only, self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request)
.retry_multi_region(self.backoff.clone())
.extract_error()
.merge(Collect)
.plan();
plan.execute().await.map(|r| {
Expand Down
60 changes: 12 additions & 48 deletions src/request/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -784,54 +784,18 @@ where
type Result = P::Result;

async fn execute(&self) -> Result<Self::Result> {
let result = self.inner.execute().await;
let mut errors = Vec::new();
match result {
Ok(mut resp) => {
if let Some(e) = resp.key_errors() {
errors.extend(e);
}
if let Some(e) = resp.region_errors() {
errors.extend(e.into_iter().map(|e| Error::RegionError(Box::new(e))));
}

if errors.is_empty() {
return Ok(resp);
}
}
Err(Error::MultipleKeyErrors(e)) => errors.extend(e),
Err(e) => errors.push(e),
};
for error in &mut errors {
match error {
Error::KvError { message }
if message
.contains("Api version in request does not match with TiKV storage")
&& message.contains("storage: V1, request: V2") =>
{
*error = Error::ServerKeyspaceNotEnabled
}
Error::KvError { message }
if message
.contains("Api version in request does not match with TiKV storage")
&& message.contains("storage: V1ttl, request: V2") =>
{
*error = Error::ServerKeyspaceNotEnabled
}
Error::KvError { message }
if message
.contains("Api version in request does not match with TiKV storage")
&& message.contains("storage: V2, request: V1") =>
{
*error = Error::ClientKeyspaceNotEnabled
}
_ => (),
}
}
match errors.len() {
0 => unreachable!(),
1 => Err(errors.pop().unwrap()),
_ => Err(Error::ExtractedErrors(errors)),
let result = self.inner.execute().await?;
if let Some(errors) = result.key_errors() {
Err(Error::ExtractedErrors(errors))
} else if let Some(errors) = result.region_errors() {
Err(Error::ExtractedErrors(
errors
.into_iter()
.map(|e| Error::RegionError(Box::new(e)))
.collect(),
))
} else {
Ok(result)
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/store/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,10 @@ impl HasKeyErrors for kvrpcpb::PessimisticRollbackResponse {
}
}

impl<T: HasKeyErrors> HasKeyErrors for Result<T, Error> {
impl<T: HasKeyErrors, E: Display> HasKeyErrors for Result<T, E> {
fn key_errors(&mut self) -> Option<Vec<Error>> {
match self {
Ok(x) => x.key_errors(),
Err(Error::MultipleKeyErrors(e)) => Some(std::mem::take(e)),
Err(e) => Some(vec![Error::StringError(e.to_string())]),
}
}
Expand Down
1 change: 0 additions & 1 deletion src/transaction/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,6 @@ impl Client {
let req = new_unsafe_destroy_range_request(range);
let plan = crate::request::PlanBuilder::new(self.pd.clone(), self.api_version, req)
.all_stores(DEFAULT_STORE_BACKOFF)
.extract_error()
.merge(crate::request::Collect)
.plan();
plan.execute().await
Expand Down
6 changes: 0 additions & 6 deletions src/transaction/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ impl<PdC: PdClient> Transaction<PdC> {
let plan = PlanBuilder::new(rpc, api_version, request)
.resolve_lock(retry_options.lock_backoff, api_version)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.extract_error()
.merge(CollectSingle)
.post_process_default()
.plan();
Expand Down Expand Up @@ -282,7 +281,6 @@ impl<PdC: PdClient> Transaction<PdC> {
let plan = PlanBuilder::new(rpc, api_version, request)
.resolve_lock(retry_options.lock_backoff, api_version)
.retry_multi_region(retry_options.region_backoff)
.extract_error()
.merge(Collect)
.plan();
plan.execute()
Expand Down Expand Up @@ -800,7 +798,6 @@ impl<PdC: PdClient> Transaction<PdC> {
let plan = PlanBuilder::new(rpc, api_version, request)
.resolve_lock(retry_options.lock_backoff, api_version)
.retry_multi_region(retry_options.region_backoff)
.extract_error()
.merge(Collect)
.plan();
plan.execute()
Expand Down Expand Up @@ -861,7 +858,6 @@ impl<PdC: PdClient> Transaction<PdC> {
)
.preserve_shard()
.retry_multi_region_preserve_results(self.options.retry_options.region_backoff.clone())
.extract_error()
.merge(CollectWithShard)
.plan();
let pairs = plan.execute().await;
Expand Down Expand Up @@ -919,7 +915,6 @@ impl<PdC: PdClient> Transaction<PdC> {
)
.retry_multi_region(self.options.retry_options.region_backoff.clone())
.extract_error()
.extract_error()
.plan();
plan.execute().await?;

Expand Down Expand Up @@ -989,7 +984,6 @@ impl<PdC: PdClient> Transaction<PdC> {
let plan = PlanBuilder::new(rpc.clone(), api_version, request)
.retry_multi_region(region_backoff.clone())
.merge(CollectSingle)
.extract_error()
.plan();
plan.execute().await?;
}
Expand Down

0 comments on commit 8e2e187

Please sign in to comment.