diff --git a/src/common/errors.rs b/src/common/errors.rs index 59f55776..3e160266 100644 --- a/src/common/errors.rs +++ b/src/common/errors.rs @@ -113,6 +113,10 @@ pub enum Error { inner: Box, success_keys: Vec>, }, + #[error("Keyspace is not enabled on the server, please enable it by setting `storage.api-version = 2`")] + ServerKeyspaceNotEnabled, + #[error("Keyspace is not enabled on the client, please enable it by `Config::with_default_keyspace()` or `Config::with_keyspace()`")] + ClientKeyspaceNotEnabled, } impl From for Error { diff --git a/src/config.rs b/src/config.rs index 1d808f5a..79dad855 100644 --- a/src/config.rs +++ b/src/config.rs @@ -88,7 +88,7 @@ impl Config { /// Set to use default keyspace. /// - /// Server should enable `api-version = 2` to use this feature. + /// Server should enable `storage.api-version = 2` to use this feature. #[must_use] pub fn with_default_keyspace(self) -> Self { self.with_keyspace("DEFAULT") @@ -96,7 +96,7 @@ impl Config { /// Set the use keyspace for the client. /// - /// Server should enable `api-version = 2` to use this feature. + /// Server should enable `storage.api-version = 2` to use this feature. #[must_use] pub fn with_keyspace(mut self, keyspace: &str) -> Self { self.keyspace = Some(keyspace.to_owned()); diff --git a/src/raw/client.rs b/src/raw/client.rs index 72c0278a..72ddd4e3 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -233,6 +233,7 @@ impl Client { let request = new_raw_get_request(key, self.cf.clone()); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request) .retry_multi_region(self.backoff.clone()) + .extract_error() .merge(CollectSingle) .post_process_default() .plan(); @@ -268,6 +269,7 @@ impl Client { let request = new_raw_batch_get_request(keys, self.cf.clone()); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request) .retry_multi_region(self.backoff.clone()) + .extract_error() .merge(Collect) .plan(); plan.execute().await.map(|r| { @@ -581,6 +583,7 @@ impl Client { ); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, req) .retry_multi_region(self.backoff.clone()) + .extract_error() .merge(CollectSingle) .post_process_default() .plan(); @@ -660,6 +663,7 @@ impl Client { crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request) .single_region_with_store(region_store.clone()) .await? + .extract_error() .plan() .execute() .await?; @@ -719,6 +723,7 @@ impl Client { let request = new_raw_batch_scan_request(ranges, each_limit, key_only, self.cf.clone()); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request) .retry_multi_region(self.backoff.clone()) + .extract_error() .merge(Collect) .plan(); plan.execute().await.map(|r| { diff --git a/src/request/api_version.rs b/src/request/api_version.rs index d43db665..76022241 100644 --- a/src/request/api_version.rs +++ b/src/request/api_version.rs @@ -163,7 +163,7 @@ fn keyspace_prefix(keyspace_id: u32, key_mode: KeyMode) -> [u8; KEYSPACE_PREFIX_ fn prepend_bytes(vec: &mut Vec, prefix: &[u8; N]) { unsafe { vec.reserve_exact(N); - std::ptr::copy(vec.as_ptr(), vec.as_mut_ptr().offset(N as isize), vec.len()); + std::ptr::copy(vec.as_ptr(), vec.as_mut_ptr().add(N), vec.len()); std::ptr::copy_nonoverlapping(prefix.as_ptr(), vec.as_mut_ptr(), N); vec.set_len(vec.len() + N); } @@ -172,11 +172,7 @@ fn prepend_bytes(vec: &mut Vec, prefix: &[u8; N]) { fn pretruncate_bytes(vec: &mut Vec) { assert!(vec.len() >= N); unsafe { - std::ptr::copy( - vec.as_ptr().offset(N as isize), - vec.as_mut_ptr(), - vec.len() - N, - ); + std::ptr::copy(vec.as_ptr().add(N), vec.as_mut_ptr(), vec.len() - N); vec.set_len(vec.len() - N); } } diff --git a/src/request/mod.rs b/src/request/mod.rs index 0230cb71..7a7bde1e 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -112,7 +112,7 @@ mod test { #[tokio::test] async fn test_region_retry() { - #[derive(Clone)] + #[derive(Debug, Clone)] struct MockRpcResponse; impl HasKeyErrors for MockRpcResponse { diff --git a/src/request/plan.rs b/src/request/plan.rs index 48ee59d6..52a12519 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -599,7 +599,7 @@ where } } -#[derive(Default)] +#[derive(Debug, Default)] pub struct CleanupLocksResult { pub region_error: Option, pub key_error: Option>, @@ -784,18 +784,54 @@ where type Result = P::Result; async fn execute(&self) -> Result { - let mut result = self.inner.execute().await?; - if let Some(errors) = result.key_errors() { - Err(Error::ExtractedErrors(errors)) - } else if let Some(errors) = result.region_errors() { - Err(Error::ExtractedErrors( - errors - .into_iter() - .map(|e| Error::RegionError(Box::new(e))) - .collect(), - )) - } else { - Ok(result) + let result = self.inner.execute().await; + let mut errors = Vec::new(); + match result { + Ok(mut resp) => { + if let Some(e) = resp.key_errors() { + errors.extend(e); + } + if let Some(e) = resp.region_errors() { + errors.extend(e.into_iter().map(|e| Error::RegionError(Box::new(e)))); + } + + if errors.is_empty() { + return Ok(resp); + } + } + Err(Error::MultipleKeyErrors(e)) => errors.extend(e), + Err(e) => errors.push(e), + }; + for error in &mut errors { + match error { + Error::KvError { message } + if message + .contains("Api version in request does not match with TiKV storage") + && message.contains("storage: V1, request: V2") => + { + *error = Error::ServerKeyspaceNotEnabled + } + Error::KvError { message } + if message + .contains("Api version in request does not match with TiKV storage") + && message.contains("storage: V1ttl, request: V2") => + { + *error = Error::ServerKeyspaceNotEnabled + } + Error::KvError { message } + if message + .contains("Api version in request does not match with TiKV storage") + && message.contains("storage: V2, request: V1") => + { + *error = Error::ClientKeyspaceNotEnabled + } + _ => (), + } + } + match errors.len() { + 0 => unreachable!(), + 1 => Err(errors.pop().unwrap()), + _ => Err(Error::ExtractedErrors(errors)), } } } diff --git a/src/store/errors.rs b/src/store/errors.rs index c9d6c774..8ceb3617 100644 --- a/src/store/errors.rs +++ b/src/store/errors.rs @@ -1,7 +1,5 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. -use std::fmt::Display; - use crate::proto::kvrpcpb; use crate::Error; @@ -162,10 +160,11 @@ impl HasKeyErrors for kvrpcpb::PessimisticRollbackResponse { } } -impl HasKeyErrors for Result { +impl HasKeyErrors for Result { fn key_errors(&mut self) -> Option> { match self { Ok(x) => x.key_errors(), + Err(Error::MultipleKeyErrors(e)) => Some(std::mem::take(e)), Err(e) => Some(vec![Error::StringError(e.to_string())]), } } diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 0b94c697..908a3677 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -299,6 +299,7 @@ impl Client { let req = new_unsafe_destroy_range_request(range); let plan = crate::request::PlanBuilder::new(self.pd.clone(), self.api_version, req) .all_stores(DEFAULT_STORE_BACKOFF) + .extract_error() .merge(crate::request::Collect) .plan(); plan.execute().await diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index f30cadb5..4b644a65 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -146,6 +146,7 @@ impl Transaction { let plan = PlanBuilder::new(rpc, api_version, request) .resolve_lock(retry_options.lock_backoff, api_version) .retry_multi_region(DEFAULT_REGION_BACKOFF) + .extract_error() .merge(CollectSingle) .post_process_default() .plan(); @@ -281,6 +282,7 @@ impl Transaction { let plan = PlanBuilder::new(rpc, api_version, request) .resolve_lock(retry_options.lock_backoff, api_version) .retry_multi_region(retry_options.region_backoff) + .extract_error() .merge(Collect) .plan(); plan.execute() @@ -333,7 +335,11 @@ impl Transaction { let keys = keys .into_iter() .map(move |k| k.into().encode_version(api_version, KeyMode::Txn)); - self.pessimistic_lock(keys, true).await + let pairs = self + .pessimistic_lock(keys, true) + .await? + .truncate_version(api_version); + Ok(pairs) } } @@ -541,23 +547,14 @@ impl Transaction { /// # Examples /// /// ```rust,no_run - /// # use tikv_client::{Key, Config, TransactionClient, proto::kvrpcpb}; + /// # use tikv_client::{Key, Config, TransactionClient, transaction::Mutation}; /// # use futures::prelude::*; /// # futures::executor::block_on(async { /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap(); /// let mut txn = client.begin_optimistic().await.unwrap(); /// let mutations = vec![ - /// kvrpcpb::Mutation { - /// op: kvrpcpb::Op::Del.into(), - /// key: b"k0".to_vec(), - /// ..Default::default() - /// }, - /// kvrpcpb::Mutation { - /// op: kvrpcpb::Op::Put.into(), - /// key: b"k1".to_vec(), - /// value: b"v1".to_vec(), - /// ..Default::default() - /// }, + /// Mutation::Delete("k0".to_owned().into()), + /// Mutation::Put("k1".to_owned().into(), b"v1".to_vec()), /// ]; /// txn.batch_mutate(mutations).await.unwrap(); /// txn.commit().await.unwrap(); @@ -574,7 +571,7 @@ impl Transaction { .map(|mutation| mutation.encode_version(self.api_version, KeyMode::Txn)) .collect(); if self.is_pessimistic() { - self.pessimistic_lock(mutations.iter().map(|m| Key::from(m.key().clone())), false) + self.pessimistic_lock(mutations.iter().map(|m| m.key().clone()), false) .await?; for m in mutations { self.buffer.mutate(m); @@ -623,12 +620,11 @@ impl Transaction { match self.options.kind { TransactionKind::Optimistic => { for key in keys { - self.buffer.lock(key.into()); + self.buffer.lock(key); } } TransactionKind::Pessimistic(_) => { - self.pessimistic_lock(keys, false) - .await?; + self.pessimistic_lock(keys, false).await?; } } Ok(()) @@ -771,6 +767,7 @@ impl Transaction { self.api_version, ) .retry_multi_region(self.options.retry_options.region_backoff.clone()) + .extract_error() .merge(CollectSingle) .post_process_default() .plan(); @@ -803,6 +800,7 @@ impl Transaction { let plan = PlanBuilder::new(rpc, api_version, request) .resolve_lock(retry_options.lock_backoff, api_version) .retry_multi_region(retry_options.region_backoff) + .extract_error() .merge(Collect) .plan(); plan.execute() @@ -863,6 +861,7 @@ impl Transaction { ) .preserve_shard() .retry_multi_region_preserve_results(self.options.retry_options.region_backoff.clone()) + .extract_error() .merge(CollectWithShard) .plan(); let pairs = plan.execute().await; @@ -920,6 +919,7 @@ impl Transaction { ) .retry_multi_region(self.options.retry_options.region_backoff.clone()) .extract_error() + .extract_error() .plan(); plan.execute().await?; @@ -989,6 +989,7 @@ impl Transaction { let plan = PlanBuilder::new(rpc.clone(), api_version, request) .retry_multi_region(region_backoff.clone()) .merge(CollectSingle) + .extract_error() .plan(); plan.execute().await?; } @@ -1042,7 +1043,6 @@ impl Drop for Transaction { if self.get_status() == TransactionStatus::Active { match self.options.check_level { CheckLevel::Panic => { - dbg!(&self.timestamp); panic!("Dropping an active transaction. Consider commit or rollback it.") } CheckLevel::Warn => { diff --git a/tests/failpoint_tests.rs b/tests/failpoint_tests.rs index dbdc4119..d58b4016 100644 --- a/tests/failpoint_tests.rs +++ b/tests/failpoint_tests.rs @@ -374,7 +374,7 @@ async fn must_rollbacked(client: &TransactionClient, keys: HashSet>) { async fn count_locks(client: &TransactionClient) -> Result { let ts = client.current_timestamp().await.unwrap(); - let locks = client.scan_locks(&ts, vec![].., 1024).await?; + let locks = client.scan_locks(&ts, .., 1024).await?; // De-duplicated as `scan_locks` will return duplicated locks due to retry on region changes. let locks_set: HashSet> = HashSet::from_iter(locks.into_iter().map(|l| l.key)); Ok(locks_set.len()) diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 5d9812d4..0f3bb0cf 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -893,36 +893,17 @@ async fn txn_get_for_update() -> Result<()> { let keys = vec![key1.clone(), key2.clone()]; let mut t1 = client.begin_pessimistic().await?; - let mut t2 = client - .begin_with_options( - TransactionOptions::new_pessimistic().drop_check(tikv_client::CheckLevel::Warn), - ) - .await?; - let mut t3 = client - .begin_with_options( - TransactionOptions::new_optimistic().drop_check(tikv_client::CheckLevel::Warn), - ) - .await?; - let mut t4 = client - .begin_with_options( - TransactionOptions::new_optimistic().drop_check(tikv_client::CheckLevel::Warn), - ) - .await?; + let mut t2 = client.begin_pessimistic().await?; + let mut t3 = client.begin_optimistic().await?; + let mut t4 = client.begin_optimistic().await?; let mut t0 = client.begin_pessimistic().await?; - dbg!(t0.start_timestamp()); - dbg!(t1.start_timestamp()); - dbg!(t2.start_timestamp()); - dbg!(t3.start_timestamp()); - dbg!(t4.start_timestamp()); t0.put(key1.clone(), value1).await?; t0.put(key2.clone(), value2).await?; t0.commit().await?; - drop(t0); assert!(t1.get(key1.clone()).await?.is_none()); assert!(t1.get_for_update(key1.clone()).await?.unwrap() == value1); t1.commit().await?; - drop(t1); assert!(t2.batch_get(keys.clone()).await?.count() == 0); let res: HashMap<_, _> = t2 @@ -932,19 +913,14 @@ async fn txn_get_for_update() -> Result<()> { .map(From::from) .collect(); t2.commit().await?; - drop(t2); assert!(res.get(&key1.clone().into()).unwrap() == &value1); assert!(res.get(&key2.into()).unwrap() == &value2); assert!(t3.get_for_update(key1).await?.is_none()); assert!(t3.commit().await.is_err()); - assert!(t3.rollback().await.is_ok()); - drop(t3); assert!(t4.batch_get_for_update(keys).await?.is_empty()); assert!(t4.commit().await.is_err()); - assert!(t4.rollback().await.is_ok()); - drop(t4); Ok(()) }