diff --git a/src/common/errors.rs b/src/common/errors.rs index 251de71d..29622c92 100644 --- a/src/common/errors.rs +++ b/src/common/errors.rs @@ -4,6 +4,7 @@ use std::result; use thiserror::Error; +use crate::proto::kvrpcpb; use crate::BoundRange; /// An error originating from the TiKV client or dependencies. @@ -18,7 +19,7 @@ pub enum Error { DuplicateKeyInsertion, /// Failed to resolve a lock #[error("Failed to resolve lock")] - ResolveLockError, + ResolveLockError(Vec), /// Will raise this error when using a pessimistic txn only operation on an optimistic txn #[error("Invalid operation for this type of transaction")] InvalidTransactionType, diff --git a/src/request/plan.rs b/src/request/plan.rs index 905e7fad..7cee192e 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -434,15 +434,16 @@ where } if self.backoff.is_none() { - return Err(Error::ResolveLockError); + return Err(Error::ResolveLockError(locks)); } let pd_client = self.pd_client.clone(); - if resolve_locks(locks, pd_client.clone()).await? { + let live_locks = resolve_locks(locks, pd_client.clone()).await?; + if live_locks.is_empty() { result = self.inner.execute().await?; } else { match clone.backoff.next_delay_duration() { - None => return Err(Error::ResolveLockError), + None => return Err(Error::ResolveLockError(live_locks)), Some(delay_duration) => { sleep(delay_duration).await; result = clone.inner.execute().await?; diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index 0ccb5e46..0793c4f1 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -34,7 +34,7 @@ use crate::Result; const RESOLVE_LOCK_RETRY_LIMIT: usize = 10; -/// _Resolves_ the given locks. Returns whether all the given locks are resolved. +/// _Resolves_ the given locks. Returns locks still live. When there is no live locks, all the given locks are resolved. /// /// If a key has a lock, the latest status of the key is unknown. We need to "resolve" the lock, /// which means the key is finally either committed or rolled back, before we read the value of @@ -44,18 +44,16 @@ const RESOLVE_LOCK_RETRY_LIMIT: usize = 10; pub async fn resolve_locks( locks: Vec, pd_client: Arc, -) -> Result { +) -> Result /* live_locks */> { debug!("resolving locks"); let ts = pd_client.clone().get_timestamp().await?; - let mut has_live_locks = false; - let expired_locks = locks.into_iter().filter(|lock| { - let expired = ts.physical - Timestamp::from_version(lock.lock_version).physical - >= lock.lock_ttl as i64; - if !expired { - has_live_locks = true; - } - expired - }); + let (expired_locks, live_locks) = + locks + .into_iter() + .partition::, _>(|lock| { + ts.physical - Timestamp::from_version(lock.lock_version).physical + >= lock.lock_ttl as i64 + }); // records the commit version of each primary lock (representing the status of the transaction) let mut commit_versions: HashMap = HashMap::new(); @@ -103,7 +101,7 @@ pub async fn resolve_locks( .or_insert_with(HashSet::new) .insert(cleaned_region); } - Ok(!has_live_locks) + Ok(live_locks) } async fn resolve_lock_with_retry( @@ -290,12 +288,12 @@ impl LockResolver { } match &status.kind { - TransactionStatusKind::Locked(..) => { + TransactionStatusKind::Locked(_, lock_info) => { error!( "cleanup_locks fail to clean locks, this result is not expected. txn_id:{}", txn_id ); - return Err(Error::ResolveLockError); + return Err(Error::ResolveLockError(vec![lock_info.clone()])); } TransactionStatusKind::Committed(ts) => txn_infos.insert(txn_id, ts.version()), TransactionStatusKind::RolledBack => txn_infos.insert(txn_id, 0), diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index bd3852e3..ca08d46c 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -950,7 +950,7 @@ mod tests { let input = vec![ Ok(resp1), Ok(resp_empty_value), - Err(ResolveLockError), + Err(ResolveLockError(vec![])), Ok(resp_not_found), ]; let result = merger.merge(input); @@ -960,7 +960,7 @@ mod tests { success_keys, } = result.unwrap_err() { - assert!(matches!(*inner, ResolveLockError)); + assert!(matches!(*inner, ResolveLockError(_))); assert_eq!( success_keys, vec![key1.to_vec(), key2.to_vec(), key3.to_vec(), key4.to_vec()]