Skip to content

Commit

Permalink
support file based transaction
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Yu <[email protected]>
  • Loading branch information
pingyu committed Mar 19, 2024
1 parent bbaf317 commit 1f546e1
Show file tree
Hide file tree
Showing 11 changed files with 294 additions and 70 deletions.
24 changes: 24 additions & 0 deletions proto/kvrpcpb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ message PrewriteRequest {
uint64 max_commit_ts = 14;
// The level of assertion to use on this prewrte request.
AssertionLevel assertion_level = 15;

// Reserved for file based transaction.
repeated uint64 txn_file_chunks = 100;
}

message PrewriteResponse {
Expand Down Expand Up @@ -196,6 +199,9 @@ message TxnHeartBeatRequest {
uint64 start_version = 3;
// The new TTL the sender would like.
uint64 advise_lock_ttl = 4;

// Reserved for file based transaction.
bool is_txn_file = 100;
}

message TxnHeartBeatResponse {
Expand Down Expand Up @@ -232,6 +238,9 @@ message CheckTxnStatusRequest {
// lock, the transaction status could not be decided if the primary lock is pessimistic too and
// it's still uncertain.
bool resolving_pessimistic_lock = 8;

// Reserved for file based transaction.
bool is_txn_file = 100;
}

message CheckTxnStatusResponse {
Expand Down Expand Up @@ -282,6 +291,9 @@ message CommitRequest {
repeated bytes keys = 3;
// Timestamp for the end of the transaction. Must be greater than `start_version`.
uint64 commit_version = 4;

// Reserved for file based transaction.
bool is_txn_file = 100;
}

message CommitResponse {
Expand Down Expand Up @@ -348,6 +360,9 @@ message BatchRollbackRequest {
uint64 start_version = 2;
// The keys to rollback.
repeated bytes keys = 3;

// Reserved for file based transaction.
bool is_txn_file = 100;
}

message BatchRollbackResponse {
Expand Down Expand Up @@ -387,6 +402,9 @@ message ResolveLockRequest {
repeated TxnInfo txn_infos = 4;
// Only resolve specified keys.
repeated bytes keys = 5;

// Reserved for file based transaction.
bool is_txn_file = 100;
}

message ResolveLockResponse {
Expand Down Expand Up @@ -774,6 +792,9 @@ message LockInfo {
bool use_async_commit = 8;
uint64 min_commit_ts = 9;
repeated bytes secondaries = 10;

// Reserved for file based transaction.
bool is_txn_file = 100;
}

message KeyError {
Expand Down Expand Up @@ -1009,6 +1030,9 @@ message MvccInfo {
message TxnInfo {
uint64 txn = 1;
uint64 status = 2;

// Reserved for file based transaction.
bool is_txn_file = 100;
}

enum Action {
Expand Down
2 changes: 2 additions & 0 deletions src/common/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ pub enum Error {
inner: Box<Error>,
success_keys: Vec<Vec<u8>>,
},
#[error("Transaction not found error: {:?}", _0)]
TxnNotFound(kvrpcpb::TxnNotFound),
}

impl From<crate::proto::errorpb::Error> for Error {
Expand Down
24 changes: 24 additions & 0 deletions src/generated/kvrpcpb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ pub struct PrewriteRequest {
/// The level of assertion to use on this prewrte request.
#[prost(enumeration = "AssertionLevel", tag = "15")]
pub assertion_level: i32,
/// Reserved for file based transaction.
#[prost(uint64, repeated, tag = "100")]
pub txn_file_chunks: ::prost::alloc::vec::Vec<u64>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -255,6 +258,9 @@ pub struct TxnHeartBeatRequest {
/// The new TTL the sender would like.
#[prost(uint64, tag = "4")]
pub advise_lock_ttl: u64,
/// Reserved for file based transaction.
#[prost(bool, tag = "100")]
pub is_txn_file: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -304,6 +310,9 @@ pub struct CheckTxnStatusRequest {
/// it's still uncertain.
#[prost(bool, tag = "8")]
pub resolving_pessimistic_lock: bool,
/// Reserved for file based transaction.
#[prost(bool, tag = "100")]
pub is_txn_file: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -373,6 +382,9 @@ pub struct CommitRequest {
/// Timestamp for the end of the transaction. Must be greater than `start_version`.
#[prost(uint64, tag = "4")]
pub commit_version: u64,
/// Reserved for file based transaction.
#[prost(bool, tag = "100")]
pub is_txn_file: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -470,6 +482,9 @@ pub struct BatchRollbackRequest {
/// The keys to rollback.
#[prost(bytes = "vec", repeated, tag = "3")]
pub keys: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
/// Reserved for file based transaction.
#[prost(bool, tag = "100")]
pub is_txn_file: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -528,6 +543,9 @@ pub struct ResolveLockRequest {
/// Only resolve specified keys.
#[prost(bytes = "vec", repeated, tag = "5")]
pub keys: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
/// Reserved for file based transaction.
#[prost(bool, tag = "100")]
pub is_txn_file: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -1050,6 +1068,9 @@ pub struct LockInfo {
pub min_commit_ts: u64,
#[prost(bytes = "vec", repeated, tag = "10")]
pub secondaries: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
/// Reserved for file based transaction.
#[prost(bool, tag = "100")]
pub is_txn_file: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -1344,6 +1365,9 @@ pub struct TxnInfo {
pub txn: u64,
#[prost(uint64, tag = "2")]
pub status: u64,
/// Reserved for file based transaction.
#[prost(bool, tag = "100")]
pub is_txn_file: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
2 changes: 0 additions & 2 deletions src/raw/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,6 @@ mod test {

use super::*;
use crate::backoff::DEFAULT_REGION_BACKOFF;
use crate::backoff::OPTIMISTIC_BACKOFF;
use crate::mock::MockKvClient;
use crate::mock::MockPdClient;
use crate::proto::kvrpcpb;
Expand Down Expand Up @@ -542,7 +541,6 @@ mod test {
};
let encoded_scan = EncodedRequest::new(scan, client.get_codec());
let plan = crate::request::PlanBuilder::new(client, encoded_scan)
.resolve_lock(OPTIMISTIC_BACKOFF)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(Collect)
.plan();
Expand Down
7 changes: 4 additions & 3 deletions src/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ mod test {
use crate::Error;
use crate::Key;
use crate::Result;
use crate::TimestampExt as _;

#[tokio::test]
async fn test_region_retry() {
Expand Down Expand Up @@ -201,7 +202,7 @@ mod test {

let encoded_req = EncodedRequest::new(request, pd_client.get_codec());
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
.resolve_lock(Backoff::no_jitter_backoff(1, 1, 3))
.resolve_lock(Timestamp::max(), Backoff::no_jitter_backoff(1, 1, 3))
.retry_multi_region(Backoff::no_jitter_backoff(1, 1, 3))
.extract_error()
.plan();
Expand All @@ -228,14 +229,14 @@ mod test {

// does not extract error
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req.clone())
.resolve_lock(OPTIMISTIC_BACKOFF)
.resolve_lock(Timestamp::max(), OPTIMISTIC_BACKOFF)
.retry_multi_region(OPTIMISTIC_BACKOFF)
.plan();
assert!(plan.execute().await.is_ok());

// extract error
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
.resolve_lock(OPTIMISTIC_BACKOFF)
.resolve_lock(Timestamp::max(), OPTIMISTIC_BACKOFF)
.retry_multi_region(OPTIMISTIC_BACKOFF)
.extract_error()
.plan();
Expand Down
8 changes: 7 additions & 1 deletion src/request/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::transaction::ResolveLocksOptions;
use crate::util::iter::FlatMapOkIterExt;
use crate::Error;
use crate::Result;
use crate::Timestamp;

/// A plan for how to execute a request. A user builds up a plan with various
/// options, then exectutes it.
Expand Down Expand Up @@ -544,6 +545,7 @@ pub struct DefaultProcessor;

pub struct ResolveLock<P: Plan, PdC: PdClient> {
pub inner: P,
pub timestamp: Timestamp,
pub pd_client: Arc<PdC>,
pub backoff: Backoff,
}
Expand All @@ -552,6 +554,7 @@ impl<P: Plan, PdC: PdClient> Clone for ResolveLock<P, PdC> {
fn clone(&self) -> Self {
ResolveLock {
inner: self.inner.clone(),
timestamp: self.timestamp.clone(),
pd_client: self.pd_client.clone(),
backoff: self.backoff.clone(),
}
Expand Down Expand Up @@ -579,7 +582,8 @@ where
}

let pd_client = self.pd_client.clone();
let live_locks = resolve_locks(locks, pd_client.clone()).await?;
let live_locks =
resolve_locks(locks, self.timestamp.clone(), pd_client.clone()).await?;
if live_locks.is_empty() {
result = self.inner.execute().await?;
} else {
Expand Down Expand Up @@ -856,6 +860,7 @@ mod test {
use super::*;
use crate::mock::MockPdClient;
use crate::proto::kvrpcpb::BatchGetResponse;
use crate::TimestampExt as _;

#[derive(Clone)]
struct ErrPlan;
Expand Down Expand Up @@ -889,6 +894,7 @@ mod test {
let plan = RetryableMultiRegion {
inner: ResolveLock {
inner: ErrPlan,
timestamp: Timestamp::max(),
backoff: Backoff::no_backoff(),
pd_client: Arc::new(MockPdClient::default()),
},
Expand Down
8 changes: 7 additions & 1 deletion src/request/plan_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::transaction::HasLocks;
use crate::transaction::ResolveLocksContext;
use crate::transaction::ResolveLocksOptions;
use crate::Result;
use crate::Timestamp;

/// Builder type for plans (see that module for more).
pub struct PlanBuilder<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> {
Expand Down Expand Up @@ -69,14 +70,19 @@ impl<PdC: PdClient, P: Plan> PlanBuilder<PdC, P, Targetted> {

impl<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> PlanBuilder<PdC, P, Ph> {
/// If there is a lock error, then resolve the lock and retry the request.
pub fn resolve_lock(self, backoff: Backoff) -> PlanBuilder<PdC, ResolveLock<P, PdC>, Ph>
pub fn resolve_lock(
self,
timestamp: Timestamp,
backoff: Backoff,
) -> PlanBuilder<PdC, ResolveLock<P, PdC>, Ph>
where
P::Result: HasLocks,
{
PlanBuilder {
pd_client: self.pd_client.clone(),
plan: ResolveLock {
inner: self.plan,
timestamp,
backoff,
pd_client: self.pd_client,
},
Expand Down
4 changes: 4 additions & 0 deletions src/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ pub trait TimestampExt: Sized {
fn from_version(version: u64) -> Self;
/// Convert u64 to an optional timestamp, where `0` represents no timestamp.
fn try_from_version(version: u64) -> Option<Self>;
/// Return the maximum timestamp.
fn max() -> Self {
Self::from_version(u64::MAX)
}
}

impl TimestampExt for Timestamp {
Expand Down
Loading

0 comments on commit 1f546e1

Please sign in to comment.