Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] txn: Support file based transaction #446

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions proto/kvrpcpb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ message PrewriteRequest {
uint64 max_commit_ts = 14;
// The level of assertion to use on this prewrte request.
AssertionLevel assertion_level = 15;

// Reserved for file based transaction.
repeated uint64 txn_file_chunks = 100;
}

message PrewriteResponse {
Expand Down Expand Up @@ -196,6 +199,9 @@ message TxnHeartBeatRequest {
uint64 start_version = 3;
// The new TTL the sender would like.
uint64 advise_lock_ttl = 4;

// Reserved for file based transaction.
bool is_txn_file = 100;
}

message TxnHeartBeatResponse {
Expand Down Expand Up @@ -232,6 +238,9 @@ message CheckTxnStatusRequest {
// lock, the transaction status could not be decided if the primary lock is pessimistic too and
// it's still uncertain.
bool resolving_pessimistic_lock = 8;

// Reserved for file based transaction.
bool is_txn_file = 100;
}

message CheckTxnStatusResponse {
Expand Down Expand Up @@ -282,6 +291,9 @@ message CommitRequest {
repeated bytes keys = 3;
// Timestamp for the end of the transaction. Must be greater than `start_version`.
uint64 commit_version = 4;

// Reserved for file based transaction.
bool is_txn_file = 100;
}

message CommitResponse {
Expand Down Expand Up @@ -348,6 +360,9 @@ message BatchRollbackRequest {
uint64 start_version = 2;
// The keys to rollback.
repeated bytes keys = 3;

// Reserved for file based transaction.
bool is_txn_file = 100;
}

message BatchRollbackResponse {
Expand Down Expand Up @@ -387,6 +402,9 @@ message ResolveLockRequest {
repeated TxnInfo txn_infos = 4;
// Only resolve specified keys.
repeated bytes keys = 5;

// Reserved for file based transaction.
bool is_txn_file = 100;
}

message ResolveLockResponse {
Expand Down Expand Up @@ -774,6 +792,9 @@ message LockInfo {
bool use_async_commit = 8;
uint64 min_commit_ts = 9;
repeated bytes secondaries = 10;

// Reserved for file based transaction.
bool is_txn_file = 100;
}

message KeyError {
Expand Down Expand Up @@ -1009,6 +1030,9 @@ message MvccInfo {
message TxnInfo {
uint64 txn = 1;
uint64 status = 2;

// Reserved for file based transaction.
bool is_txn_file = 100;
}

enum Action {
Expand Down
2 changes: 2 additions & 0 deletions src/common/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ pub enum Error {
inner: Box<Error>,
success_keys: Vec<Vec<u8>>,
},
#[error("Transaction not found error: {:?}", _0)]
TxnNotFound(kvrpcpb::TxnNotFound),
}

impl From<crate::proto::errorpb::Error> for Error {
Expand Down
24 changes: 24 additions & 0 deletions src/generated/kvrpcpb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ pub struct PrewriteRequest {
/// The level of assertion to use on this prewrte request.
#[prost(enumeration = "AssertionLevel", tag = "15")]
pub assertion_level: i32,
/// Reserved for file based transaction.
#[prost(uint64, repeated, tag = "100")]
pub txn_file_chunks: ::prost::alloc::vec::Vec<u64>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -255,6 +258,9 @@ pub struct TxnHeartBeatRequest {
/// The new TTL the sender would like.
#[prost(uint64, tag = "4")]
pub advise_lock_ttl: u64,
/// Reserved for file based transaction.
#[prost(bool, tag = "100")]
pub is_txn_file: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -304,6 +310,9 @@ pub struct CheckTxnStatusRequest {
/// it's still uncertain.
#[prost(bool, tag = "8")]
pub resolving_pessimistic_lock: bool,
/// Reserved for file based transaction.
#[prost(bool, tag = "100")]
pub is_txn_file: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -373,6 +382,9 @@ pub struct CommitRequest {
/// Timestamp for the end of the transaction. Must be greater than `start_version`.
#[prost(uint64, tag = "4")]
pub commit_version: u64,
/// Reserved for file based transaction.
#[prost(bool, tag = "100")]
pub is_txn_file: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -470,6 +482,9 @@ pub struct BatchRollbackRequest {
/// The keys to rollback.
#[prost(bytes = "vec", repeated, tag = "3")]
pub keys: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
/// Reserved for file based transaction.
#[prost(bool, tag = "100")]
pub is_txn_file: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -528,6 +543,9 @@ pub struct ResolveLockRequest {
/// Only resolve specified keys.
#[prost(bytes = "vec", repeated, tag = "5")]
pub keys: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
/// Reserved for file based transaction.
#[prost(bool, tag = "100")]
pub is_txn_file: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -1050,6 +1068,9 @@ pub struct LockInfo {
pub min_commit_ts: u64,
#[prost(bytes = "vec", repeated, tag = "10")]
pub secondaries: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
/// Reserved for file based transaction.
#[prost(bool, tag = "100")]
pub is_txn_file: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -1344,6 +1365,9 @@ pub struct TxnInfo {
pub txn: u64,
#[prost(uint64, tag = "2")]
pub status: u64,
/// Reserved for file based transaction.
#[prost(bool, tag = "100")]
pub is_txn_file: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
2 changes: 0 additions & 2 deletions src/raw/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,6 @@ mod test {

use super::*;
use crate::backoff::DEFAULT_REGION_BACKOFF;
use crate::backoff::OPTIMISTIC_BACKOFF;
use crate::mock::MockKvClient;
use crate::mock::MockPdClient;
use crate::proto::kvrpcpb;
Expand Down Expand Up @@ -542,7 +541,6 @@ mod test {
};
let encoded_scan = EncodedRequest::new(scan, client.get_codec());
let plan = crate::request::PlanBuilder::new(client, encoded_scan)
.resolve_lock(OPTIMISTIC_BACKOFF)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(Collect)
.plan();
Expand Down
7 changes: 4 additions & 3 deletions src/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ mod test {
use crate::Error;
use crate::Key;
use crate::Result;
use crate::TimestampExt as _;

#[tokio::test]
async fn test_region_retry() {
Expand Down Expand Up @@ -201,7 +202,7 @@ mod test {

let encoded_req = EncodedRequest::new(request, pd_client.get_codec());
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
.resolve_lock(Backoff::no_jitter_backoff(1, 1, 3))
.resolve_lock(Timestamp::max(), Backoff::no_jitter_backoff(1, 1, 3))
.retry_multi_region(Backoff::no_jitter_backoff(1, 1, 3))
.extract_error()
.plan();
Expand All @@ -228,14 +229,14 @@ mod test {

// does not extract error
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req.clone())
.resolve_lock(OPTIMISTIC_BACKOFF)
.resolve_lock(Timestamp::max(), OPTIMISTIC_BACKOFF)
.retry_multi_region(OPTIMISTIC_BACKOFF)
.plan();
assert!(plan.execute().await.is_ok());

// extract error
let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
.resolve_lock(OPTIMISTIC_BACKOFF)
.resolve_lock(Timestamp::max(), OPTIMISTIC_BACKOFF)
.retry_multi_region(OPTIMISTIC_BACKOFF)
.extract_error()
.plan();
Expand Down
8 changes: 7 additions & 1 deletion src/request/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::transaction::ResolveLocksOptions;
use crate::util::iter::FlatMapOkIterExt;
use crate::Error;
use crate::Result;
use crate::Timestamp;

/// A plan for how to execute a request. A user builds up a plan with various
/// options, then exectutes it.
Expand Down Expand Up @@ -544,6 +545,7 @@ pub struct DefaultProcessor;

pub struct ResolveLock<P: Plan, PdC: PdClient> {
pub inner: P,
pub timestamp: Timestamp,
pub pd_client: Arc<PdC>,
pub backoff: Backoff,
}
Expand All @@ -552,6 +554,7 @@ impl<P: Plan, PdC: PdClient> Clone for ResolveLock<P, PdC> {
fn clone(&self) -> Self {
ResolveLock {
inner: self.inner.clone(),
timestamp: self.timestamp.clone(),
pd_client: self.pd_client.clone(),
backoff: self.backoff.clone(),
}
Expand Down Expand Up @@ -579,7 +582,8 @@ where
}

let pd_client = self.pd_client.clone();
let live_locks = resolve_locks(locks, pd_client.clone()).await?;
let live_locks =
resolve_locks(locks, self.timestamp.clone(), pd_client.clone()).await?;
if live_locks.is_empty() {
result = self.inner.execute().await?;
} else {
Expand Down Expand Up @@ -856,6 +860,7 @@ mod test {
use super::*;
use crate::mock::MockPdClient;
use crate::proto::kvrpcpb::BatchGetResponse;
use crate::TimestampExt as _;

#[derive(Clone)]
struct ErrPlan;
Expand Down Expand Up @@ -889,6 +894,7 @@ mod test {
let plan = RetryableMultiRegion {
inner: ResolveLock {
inner: ErrPlan,
timestamp: Timestamp::max(),
backoff: Backoff::no_backoff(),
pd_client: Arc::new(MockPdClient::default()),
},
Expand Down
8 changes: 7 additions & 1 deletion src/request/plan_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::transaction::HasLocks;
use crate::transaction::ResolveLocksContext;
use crate::transaction::ResolveLocksOptions;
use crate::Result;
use crate::Timestamp;

/// Builder type for plans (see that module for more).
pub struct PlanBuilder<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> {
Expand Down Expand Up @@ -69,14 +70,19 @@ impl<PdC: PdClient, P: Plan> PlanBuilder<PdC, P, Targetted> {

impl<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> PlanBuilder<PdC, P, Ph> {
/// If there is a lock error, then resolve the lock and retry the request.
pub fn resolve_lock(self, backoff: Backoff) -> PlanBuilder<PdC, ResolveLock<P, PdC>, Ph>
pub fn resolve_lock(
self,
timestamp: Timestamp,
backoff: Backoff,
) -> PlanBuilder<PdC, ResolveLock<P, PdC>, Ph>
where
P::Result: HasLocks,
{
PlanBuilder {
pd_client: self.pd_client.clone(),
plan: ResolveLock {
inner: self.plan,
timestamp,
backoff,
pd_client: self.pd_client,
},
Expand Down
4 changes: 4 additions & 0 deletions src/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ pub trait TimestampExt: Sized {
fn from_version(version: u64) -> Self;
/// Convert u64 to an optional timestamp, where `0` represents no timestamp.
fn try_from_version(version: u64) -> Option<Self>;
/// Return the maximum timestamp.
fn max() -> Self {
Self::from_version(u64::MAX)
}
}

impl TimestampExt for Timestamp {
Expand Down
Loading