diff --git a/Makefile b/Makefile index 82f31ced..aef0ad45 100644 --- a/Makefile +++ b/Makefile @@ -33,7 +33,7 @@ doc: cargo doc --workspace --exclude tikv-client-proto --document-private-items --no-deps tiup: - tiup playground nightly --mode tikv-slim --kv 1 --without-monitor --kv.config ./config/tikv.toml --pd.config ./config/pd.toml --kv.binpath ../tikv/target/debug/tikv-server & + tiup playground nightly --mode tikv-slim --kv 3 --without-monitor --kv.config $(shell pwd)/config/tikv.toml --pd.config $(shell pwd)/config/pd.toml & all: generate check doc test diff --git a/src/raw/client.rs b/src/raw/client.rs index 72ddd4e3..72c0278a 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -233,7 +233,6 @@ impl Client { let request = new_raw_get_request(key, self.cf.clone()); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request) .retry_multi_region(self.backoff.clone()) - .extract_error() .merge(CollectSingle) .post_process_default() .plan(); @@ -269,7 +268,6 @@ impl Client { let request = new_raw_batch_get_request(keys, self.cf.clone()); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request) .retry_multi_region(self.backoff.clone()) - .extract_error() .merge(Collect) .plan(); plan.execute().await.map(|r| { @@ -583,7 +581,6 @@ impl Client { ); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, req) .retry_multi_region(self.backoff.clone()) - .extract_error() .merge(CollectSingle) .post_process_default() .plan(); @@ -663,7 +660,6 @@ impl Client { crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request) .single_region_with_store(region_store.clone()) .await? - .extract_error() .plan() .execute() .await?; @@ -723,7 +719,6 @@ impl Client { let request = new_raw_batch_scan_request(ranges, each_limit, key_only, self.cf.clone()); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request) .retry_multi_region(self.backoff.clone()) - .extract_error() .merge(Collect) .plan(); plan.execute().await.map(|r| { diff --git a/src/request/plan.rs b/src/request/plan.rs index 52a12519..aefc959d 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -784,54 +784,18 @@ where type Result = P::Result; async fn execute(&self) -> Result { - let result = self.inner.execute().await; - let mut errors = Vec::new(); - match result { - Ok(mut resp) => { - if let Some(e) = resp.key_errors() { - errors.extend(e); - } - if let Some(e) = resp.region_errors() { - errors.extend(e.into_iter().map(|e| Error::RegionError(Box::new(e)))); - } - - if errors.is_empty() { - return Ok(resp); - } - } - Err(Error::MultipleKeyErrors(e)) => errors.extend(e), - Err(e) => errors.push(e), - }; - for error in &mut errors { - match error { - Error::KvError { message } - if message - .contains("Api version in request does not match with TiKV storage") - && message.contains("storage: V1, request: V2") => - { - *error = Error::ServerKeyspaceNotEnabled - } - Error::KvError { message } - if message - .contains("Api version in request does not match with TiKV storage") - && message.contains("storage: V1ttl, request: V2") => - { - *error = Error::ServerKeyspaceNotEnabled - } - Error::KvError { message } - if message - .contains("Api version in request does not match with TiKV storage") - && message.contains("storage: V2, request: V1") => - { - *error = Error::ClientKeyspaceNotEnabled - } - _ => (), - } - } - match errors.len() { - 0 => unreachable!(), - 1 => Err(errors.pop().unwrap()), - _ => Err(Error::ExtractedErrors(errors)), + let result = self.inner.execute().await?; + if let Some(errors) = result.key_errors() { + Err(Error::ExtractedErrors(errors)) + } else if let Some(errors) = result.region_errors() { + Err(Error::ExtractedErrors( + errors + .into_iter() + .map(|e| Error::RegionError(Box::new(e))) + .collect(), + )) + } else { + Ok(result) } } } diff --git a/src/store/errors.rs b/src/store/errors.rs index 8ceb3617..87d0cbba 100644 --- a/src/store/errors.rs +++ b/src/store/errors.rs @@ -160,11 +160,10 @@ impl HasKeyErrors for kvrpcpb::PessimisticRollbackResponse { } } -impl HasKeyErrors for Result { +impl HasKeyErrors for Result { fn key_errors(&mut self) -> Option> { match self { Ok(x) => x.key_errors(), - Err(Error::MultipleKeyErrors(e)) => Some(std::mem::take(e)), Err(e) => Some(vec![Error::StringError(e.to_string())]), } } diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 908a3677..0b94c697 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -299,7 +299,6 @@ impl Client { let req = new_unsafe_destroy_range_request(range); let plan = crate::request::PlanBuilder::new(self.pd.clone(), self.api_version, req) .all_stores(DEFAULT_STORE_BACKOFF) - .extract_error() .merge(crate::request::Collect) .plan(); plan.execute().await diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 4b644a65..1e2dae24 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -146,7 +146,6 @@ impl Transaction { let plan = PlanBuilder::new(rpc, api_version, request) .resolve_lock(retry_options.lock_backoff, api_version) .retry_multi_region(DEFAULT_REGION_BACKOFF) - .extract_error() .merge(CollectSingle) .post_process_default() .plan(); @@ -282,7 +281,6 @@ impl Transaction { let plan = PlanBuilder::new(rpc, api_version, request) .resolve_lock(retry_options.lock_backoff, api_version) .retry_multi_region(retry_options.region_backoff) - .extract_error() .merge(Collect) .plan(); plan.execute() @@ -800,7 +798,6 @@ impl Transaction { let plan = PlanBuilder::new(rpc, api_version, request) .resolve_lock(retry_options.lock_backoff, api_version) .retry_multi_region(retry_options.region_backoff) - .extract_error() .merge(Collect) .plan(); plan.execute() @@ -861,7 +858,6 @@ impl Transaction { ) .preserve_shard() .retry_multi_region_preserve_results(self.options.retry_options.region_backoff.clone()) - .extract_error() .merge(CollectWithShard) .plan(); let pairs = plan.execute().await; @@ -919,7 +915,6 @@ impl Transaction { ) .retry_multi_region(self.options.retry_options.region_backoff.clone()) .extract_error() - .extract_error() .plan(); plan.execute().await?; @@ -989,7 +984,6 @@ impl Transaction { let plan = PlanBuilder::new(rpc.clone(), api_version, request) .retry_multi_region(region_backoff.clone()) .merge(CollectSingle) - .extract_error() .plan(); plan.execute().await?; }