Skip to content

Commit

Permalink
Return live locks on resolve locks error (#417)
Browse files Browse the repository at this point in the history
* API v2 part1

Signed-off-by: Ping Yu <[email protected]>

* inplace encoding

Signed-off-by: Ping Yu <[email protected]>

* polish

Signed-off-by: Ping Yu <[email protected]>

* polish

Signed-off-by: Ping Yu <[email protected]>

* export proto

Signed-off-by: Ping Yu <[email protected]>

* fix set_context

Signed-off-by: Ping Yu <[email protected]>

* get live locks

Signed-off-by: Ping Yu <[email protected]>

* wip

Signed-off-by: Ping Yu <[email protected]>

* add Codec parameter to Transaction & Snapshot

Signed-off-by: Ping Yu <[email protected]>

* polish

Signed-off-by: Ping Yu <[email protected]>

---------

Signed-off-by: Ping Yu <[email protected]>
  • Loading branch information
pingyu authored Aug 30, 2023
1 parent 4b0e844 commit d42b31a
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 20 deletions.
3 changes: 2 additions & 1 deletion src/common/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::result;

use thiserror::Error;

use crate::proto::kvrpcpb;
use crate::BoundRange;

/// An error originating from the TiKV client or dependencies.
Expand All @@ -18,7 +19,7 @@ pub enum Error {
DuplicateKeyInsertion,
/// Failed to resolve a lock
#[error("Failed to resolve lock")]
ResolveLockError,
ResolveLockError(Vec<kvrpcpb::LockInfo>),
/// Will raise this error when using a pessimistic txn only operation on an optimistic txn
#[error("Invalid operation for this type of transaction")]
InvalidTransactionType,
Expand Down
7 changes: 4 additions & 3 deletions src/request/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,15 +434,16 @@ where
}

if self.backoff.is_none() {
return Err(Error::ResolveLockError);
return Err(Error::ResolveLockError(locks));
}

let pd_client = self.pd_client.clone();
if resolve_locks(locks, pd_client.clone()).await? {
let live_locks = resolve_locks(locks, pd_client.clone()).await?;
if live_locks.is_empty() {
result = self.inner.execute().await?;
} else {
match clone.backoff.next_delay_duration() {
None => return Err(Error::ResolveLockError),
None => return Err(Error::ResolveLockError(live_locks)),
Some(delay_duration) => {
sleep(delay_duration).await;
result = clone.inner.execute().await?;
Expand Down
26 changes: 12 additions & 14 deletions src/transaction/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::Result;

const RESOLVE_LOCK_RETRY_LIMIT: usize = 10;

/// _Resolves_ the given locks. Returns whether all the given locks are resolved.
/// _Resolves_ the given locks. Returns locks still live. When there is no live locks, all the given locks are resolved.
///
/// If a key has a lock, the latest status of the key is unknown. We need to "resolve" the lock,
/// which means the key is finally either committed or rolled back, before we read the value of
Expand All @@ -44,18 +44,16 @@ const RESOLVE_LOCK_RETRY_LIMIT: usize = 10;
pub async fn resolve_locks(
locks: Vec<kvrpcpb::LockInfo>,
pd_client: Arc<impl PdClient>,
) -> Result<bool> {
) -> Result<Vec<kvrpcpb::LockInfo> /* live_locks */> {
debug!("resolving locks");
let ts = pd_client.clone().get_timestamp().await?;
let mut has_live_locks = false;
let expired_locks = locks.into_iter().filter(|lock| {
let expired = ts.physical - Timestamp::from_version(lock.lock_version).physical
>= lock.lock_ttl as i64;
if !expired {
has_live_locks = true;
}
expired
});
let (expired_locks, live_locks) =
locks
.into_iter()
.partition::<Vec<kvrpcpb::LockInfo>, _>(|lock| {
ts.physical - Timestamp::from_version(lock.lock_version).physical
>= lock.lock_ttl as i64
});

// records the commit version of each primary lock (representing the status of the transaction)
let mut commit_versions: HashMap<u64, u64> = HashMap::new();
Expand Down Expand Up @@ -103,7 +101,7 @@ pub async fn resolve_locks(
.or_insert_with(HashSet::new)
.insert(cleaned_region);
}
Ok(!has_live_locks)
Ok(live_locks)
}

async fn resolve_lock_with_retry(
Expand Down Expand Up @@ -290,12 +288,12 @@ impl LockResolver {
}

match &status.kind {
TransactionStatusKind::Locked(..) => {
TransactionStatusKind::Locked(_, lock_info) => {
error!(
"cleanup_locks fail to clean locks, this result is not expected. txn_id:{}",
txn_id
);
return Err(Error::ResolveLockError);
return Err(Error::ResolveLockError(vec![lock_info.clone()]));
}
TransactionStatusKind::Committed(ts) => txn_infos.insert(txn_id, ts.version()),
TransactionStatusKind::RolledBack => txn_infos.insert(txn_id, 0),
Expand Down
4 changes: 2 additions & 2 deletions src/transaction/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,7 @@ mod tests {
let input = vec![
Ok(resp1),
Ok(resp_empty_value),
Err(ResolveLockError),
Err(ResolveLockError(vec![])),
Ok(resp_not_found),
];
let result = merger.merge(input);
Expand All @@ -960,7 +960,7 @@ mod tests {
success_keys,
} = result.unwrap_err()
{
assert!(matches!(*inner, ResolveLockError));
assert!(matches!(*inner, ResolveLockError(_)));
assert_eq!(
success_keys,
vec![key1.to_vec(), key2.to_vec(), key3.to_vec(), key4.to_vec()]
Expand Down

0 comments on commit d42b31a

Please sign in to comment.