diff --git a/Cargo.toml b/Cargo.toml index 0aab2b03..913b7244 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ regex = "1" semver = "1.0" serde = "1.0" serde_derive = "1.0" +take_mut = "0.2.2" thiserror = "1" tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] } tonic = { version = "0.10", features = ["tls"] } @@ -54,6 +55,7 @@ proptest-derive = "0.3" reqwest = { version = "0.11", default-features = false, features = [ "native-tls-vendored", ] } +rstest = "0.18.2" serde_json = "1" serial_test = "0.5.0" simple_logger = "1" diff --git a/src/kv/key.rs b/src/kv/key.rs index 7ee16597..6893837c 100644 --- a/src/kv/key.rs +++ b/src/kv/key.rs @@ -71,7 +71,7 @@ pub struct Key( test, proptest(strategy = "any_with::>((size_range(_PROPTEST_KEY_MAX), ()))") )] - pub(super) Vec, + pub(crate) Vec, ); impl AsRef for kvrpcpb::Mutation { diff --git a/src/raw/client.rs b/src/raw/client.rs index 0bdc2f8b..2817de16 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -15,9 +15,13 @@ use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::metapb; use crate::raw::lowering::*; +use crate::request::APIVersion; use crate::request::Collect; use crate::request::CollectSingle; +use crate::request::EncodeVersion; +use crate::request::KeyMode; use crate::request::Plan; +use crate::request::TruncateVersion; use crate::Backoff; use crate::BoundRange; use crate::ColumnFamily; @@ -41,6 +45,7 @@ pub struct Client { backoff: Backoff, /// Whether to use the [`atomic mode`](Client::with_atomic_for_cas). atomic: bool, + api_version: APIVersion, } impl Clone for Client { @@ -50,6 +55,7 @@ impl Clone for Client { cf: self.cf.clone(), backoff: self.backoff.clone(), atomic: self.atomic, + api_version: self.api_version, } } } @@ -100,12 +106,13 @@ impl Client { config: Config, ) -> Result { let pd_endpoints: Vec = pd_endpoints.into_iter().map(Into::into).collect(); - let rpc = Arc::new(PdRpcClient::connect(&pd_endpoints, config, false).await?); + let rpc = Arc::new(PdRpcClient::connect(&pd_endpoints, config.clone(), false).await?); Ok(Client { rpc, cf: None, backoff: DEFAULT_REGION_BACKOFF, atomic: false, + api_version: APIVersion::V1, }) } @@ -140,6 +147,7 @@ impl Client { cf: Some(cf), backoff: self.backoff.clone(), atomic: self.atomic, + api_version: self.api_version, } } @@ -168,6 +176,7 @@ impl Client { cf: self.cf.clone(), backoff, atomic: self.atomic, + api_version: self.api_version, } } @@ -185,6 +194,20 @@ impl Client { cf: self.cf.clone(), backoff: self.backoff.clone(), atomic: true, + api_version: self.api_version, + } + } + + /// Set the API version to use. + #[must_use] + pub fn with_keyspace(&self, keyspace: String) -> Self { + // FIXME + Client { + rpc: self.rpc.clone(), + cf: self.cf.clone(), + backoff: self.backoff.clone(), + atomic: self.atomic, + api_version: APIVersion::V2 { keyspace_id: 0 }, } } } @@ -210,8 +233,9 @@ impl Client { /// ``` pub async fn get(&self, key: impl Into) -> Result> { debug!("invoking raw get request"); - let request = new_raw_get_request(key.into(), self.cf.clone()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) + let key = key.into().encode_version(self.api_version, KeyMode::Raw); + let request = new_raw_get_request(key, self.cf.clone()); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request) .retry_multi_region(self.backoff.clone()) .merge(CollectSingle) .post_process_default() @@ -242,14 +266,19 @@ impl Client { keys: impl IntoIterator>, ) -> Result> { debug!("invoking raw batch_get request"); - let request = new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) + let keys = keys + .into_iter() + .map(|k| k.into().encode_version(self.api_version, KeyMode::Raw)); + let request = new_raw_batch_get_request(keys, self.cf.clone()); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request) .retry_multi_region(self.backoff.clone()) .merge(Collect) .plan(); - plan.execute() - .await - .map(|r| r.into_iter().map(Into::into).collect()) + plan.execute().await.map(|r| { + r.into_iter() + .map(|pair| pair.truncate_version(self.api_version)) + .collect() + }) } /// Create a new 'put' request. @@ -270,8 +299,9 @@ impl Client { /// ``` pub async fn put(&self, key: impl Into, value: impl Into) -> Result<()> { debug!("invoking raw put request"); - let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) + let key = key.into().encode_version(self.api_version, KeyMode::Raw); + let request = new_raw_put_request(key, value.into(), self.cf.clone(), self.atomic); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request) .retry_multi_region(self.backoff.clone()) .merge(CollectSingle) .extract_error() @@ -302,12 +332,11 @@ impl Client { pairs: impl IntoIterator>, ) -> Result<()> { debug!("invoking raw batch_put request"); - let request = new_raw_batch_put_request( - pairs.into_iter().map(Into::into), - self.cf.clone(), - self.atomic, - ); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) + let pairs = pairs + .into_iter() + .map(|pair| pair.into().encode_version(self.api_version, KeyMode::Raw)); + let request = new_raw_batch_put_request(pairs, self.cf.clone(), self.atomic); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request) .retry_multi_region(self.backoff.clone()) .extract_error() .plan(); @@ -334,8 +363,9 @@ impl Client { /// ``` pub async fn delete(&self, key: impl Into) -> Result<()> { debug!("invoking raw delete request"); - let request = new_raw_delete_request(key.into(), self.cf.clone(), self.atomic); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) + let key = key.into().encode_version(self.api_version, KeyMode::Raw); + let request = new_raw_delete_request(key, self.cf.clone(), self.atomic); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request) .retry_multi_region(self.backoff.clone()) .merge(CollectSingle) .extract_error() @@ -364,9 +394,11 @@ impl Client { pub async fn batch_delete(&self, keys: impl IntoIterator>) -> Result<()> { debug!("invoking raw batch_delete request"); self.assert_non_atomic()?; - let request = - new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) + let keys = keys + .into_iter() + .map(|k| k.into().encode_version(self.api_version, KeyMode::Raw)); + let request = new_raw_batch_delete_request(keys, self.cf.clone()); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request) .retry_multi_region(self.backoff.clone()) .extract_error() .plan(); @@ -392,8 +424,9 @@ impl Client { pub async fn delete_range(&self, range: impl Into) -> Result<()> { debug!("invoking raw delete_range request"); self.assert_non_atomic()?; - let request = new_raw_delete_range_request(range.into(), self.cf.clone()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) + let range = range.into().encode_version(self.api_version, KeyMode::Raw); + let request = new_raw_delete_range_request(range, self.cf.clone()); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request) .retry_multi_region(self.backoff.clone()) .extract_error() .plan(); @@ -543,13 +576,14 @@ impl Client { ) -> Result<(Option, bool)> { debug!("invoking raw compare_and_swap request"); self.assert_atomic()?; + let key = key.into().encode_version(self.api_version, KeyMode::Raw); let req = new_cas_request( - key.into(), + key, new_value.into(), previous_value.into(), self.cf.clone(), ); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req) + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, req) .retry_multi_region(self.backoff.clone()) .merge(CollectSingle) .post_process_default() @@ -563,16 +597,29 @@ impl Client { copr_version_req: impl Into, ranges: impl IntoIterator>, request_builder: impl Fn(metapb::Region, Vec>) -> Vec + Send + Sync + 'static, - ) -> Result, Vec>)>> { + ) -> Result>, Vec)>> { let copr_version_req = copr_version_req.into(); semver::VersionReq::from_str(&copr_version_req)?; + let ranges = ranges + .into_iter() + .map(|range| range.into().encode_version(self.api_version, KeyMode::Raw)); + let api_version = self.api_version; + let request_builder = move |region, ranges: Vec>| { + request_builder( + region, + ranges + .into_iter() + .map(|range| range.truncate_version(api_version)) + .collect(), + ) + }; let req = new_raw_coprocessor_request( copr_name.into(), copr_version_req, - ranges.into_iter().map(Into::into), + ranges, request_builder, ); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req) + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, req) .preserve_shard() .retry_multi_region(self.backoff.clone()) .post_process_default() @@ -592,8 +639,9 @@ impl Client { max_limit: MAX_RAW_KV_SCAN_LIMIT, }); } + + let mut cur_range = range.into().encode_version(self.api_version, KeyMode::Raw); let mut result = Vec::new(); - let mut cur_range = range.into(); let mut scan_regions = self.rpc.clone().stores_for_range(cur_range.clone()).boxed(); let mut region_store = scan_regions @@ -603,15 +651,17 @@ impl Client { range: (cur_range.clone()), })??; let mut cur_limit = limit; + while cur_limit > 0 { let request = new_raw_scan_request(cur_range.clone(), cur_limit, key_only, self.cf.clone()); - let resp = crate::request::PlanBuilder::new(self.rpc.clone(), request) - .single_region_with_store(region_store.clone()) - .await? - .plan() - .execute() - .await?; + let resp = + crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request) + .single_region_with_store(region_store.clone()) + .await? + .plan() + .execute() + .await?; let mut region_scan_res = resp .kvs .into_iter() @@ -619,6 +669,7 @@ impl Client { .collect::>(); let res_len = region_scan_res.len(); result.append(&mut region_scan_res); + // if the number of results is less than cur_limit, it means this scan range contains more than one region, so we need to scan next region if res_len < cur_limit as usize { region_store = match scan_regions.next().await { @@ -637,8 +688,13 @@ impl Client { break; } } + // limit is a soft limit, so we need check the number of results result.truncate(limit as usize); + + // truncate the version of keys + let result = result.truncate_version(self.api_version); + Ok(result) } @@ -655,17 +711,20 @@ impl Client { }); } - let request = new_raw_batch_scan_request( - ranges.into_iter().map(Into::into), - each_limit, - key_only, - self.cf.clone(), - ); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) + let ranges = ranges + .into_iter() + .map(|range| range.into().encode_version(self.api_version, KeyMode::Raw)); + + let request = new_raw_batch_scan_request(ranges, each_limit, key_only, self.cf.clone()); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request) .retry_multi_region(self.backoff.clone()) .merge(Collect) .plan(); - plan.execute().await + plan.execute().await.map(|r| { + r.into_iter() + .map(|pair| pair.truncate_version(self.api_version)) + .collect() + }) } fn assert_non_atomic(&self) -> Result<()> { @@ -718,6 +777,7 @@ mod tests { cf: Some(ColumnFamily::Default), backoff: DEFAULT_REGION_BACKOFF, atomic: false, + api_version: APIVersion::V1, }; let resps = client .coprocessor( @@ -729,25 +789,25 @@ mod tests { .await?; let resps: Vec<_> = resps .into_iter() - .map(|(data, ranges)| (String::from_utf8(data).unwrap(), ranges)) + .map(|(ranges, data)| (ranges, String::from_utf8(data).unwrap())) .collect(); assert_eq!( resps, vec![ ( + vec![Key::from(vec![5])..Key::from(vec![10])], "1:[Key(05)..Key(0A)]".to_string(), - vec![Key::from(vec![5])..Key::from(vec![10])] ), ( - "2:[Key(0A)..Key(0F), Key(14)..Key(FAFA)]".to_string(), vec![ Key::from(vec![10])..Key::from(vec![15]), Key::from(vec![20])..Key::from(vec![250, 250]) - ] + ], + "2:[Key(0A)..Key(0F), Key(14)..Key(FAFA)]".to_string(), ), ( + vec![Key::from(vec![250, 250])..Key::from(vec![])], "3:[Key(FAFA)..Key()]".to_string(), - vec![Key::from(vec![250, 250])..Key::from(vec![])] ) ] ); diff --git a/src/raw/requests.rs b/src/raw/requests.rs index f9c64717..51a10b33 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -16,6 +16,7 @@ use crate::proto::kvrpcpb; use crate::proto::metapb; use crate::proto::tikvpb::tikv_client::TikvClient; use crate::range_request; +use crate::region::RegionWithLeader; use crate::request::plan::ResponseWithShard; use crate::request::Collect; use crate::request::CollectSingle; @@ -163,7 +164,7 @@ impl Shardable for kvrpcpb::RawBatchPutRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.context = Some(store.region_with_leader.context()?); + self.set_leader(&store.region_with_leader)?; self.pairs = shard; Ok(()) } @@ -296,7 +297,7 @@ impl Shardable for kvrpcpb::RawBatchScanRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.context = Some(store.region_with_leader.context()?); + self.set_leader(&store.region_with_leader)?; self.ranges = shard; Ok(()) } @@ -398,8 +399,8 @@ impl Request for RawCoprocessorRequest { self.inner.as_any() } - fn set_context(&mut self, context: kvrpcpb::Context) { - self.inner.set_context(context); + fn set_leader(&mut self, leader: &RegionWithLeader) -> Result<()> { + self.inner.set_leader(leader) } fn set_api_version(&mut self, api_version: kvrpcpb::ApiVersion) { @@ -422,7 +423,7 @@ impl Shardable for RawCoprocessorRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.inner.context = Some(store.region_with_leader.context()?); + self.set_leader(&store.region_with_leader)?; self.inner.ranges = shard.clone(); self.inner.data = (self.data_builder)(store.region_with_leader.region.clone(), shard); Ok(()) @@ -434,7 +435,7 @@ impl Process>>>> for DefaultProcessor { - type Out = Vec<(Vec, Vec>)>; + type Out = Vec<(Vec>, Vec)>; fn process( &self, @@ -447,11 +448,11 @@ impl .map(|shard_resp| { shard_resp.map(|ResponseWithShard(resp, ranges)| { ( - resp.data, ranges .into_iter() .map(|range| range.start_key.into()..range.end_key.into()) .collect(), + resp.data, ) }) }) @@ -504,12 +505,14 @@ mod test { use crate::mock::MockKvClient; use crate::mock::MockPdClient; use crate::proto::kvrpcpb; + use crate::request::APIVersion; use crate::request::Plan; use crate::Key; - #[test] - #[ignore] - fn test_raw_scan() { + #[rstest::rstest] + #[case(APIVersion::V1)] + #[case(APIVersion::V2 { keyspace_id: 0 })] + fn test_raw_scan(#[case] api_version: APIVersion) { let client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook( |req: &dyn Any| { let req: &kvrpcpb::RawScanRequest = req.downcast_ref().unwrap(); @@ -538,7 +541,7 @@ mod test { key_only: true, ..Default::default() }; - let plan = crate::request::PlanBuilder::new(client, scan) + let plan = crate::request::PlanBuilder::new(client, APIVersion::V1, scan) .resolve_lock(OPTIMISTIC_BACKOFF) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(Collect) diff --git a/src/region.rs b/src/region.rs index 8e58522c..6fb20321 100644 --- a/src/region.rs +++ b/src/region.rs @@ -2,7 +2,6 @@ use derive_new::new; -use crate::proto::kvrpcpb; use crate::proto::metapb; use crate::Error; use crate::Key; @@ -43,21 +42,6 @@ impl RegionWithLeader { key >= start_key.as_slice() && (key < end_key.as_slice() || end_key.is_empty()) } - pub fn context(&self) -> Result { - self.leader - .as_ref() - .ok_or(Error::LeaderNotFound { - region_id: self.region.id, - }) - .map(|l| { - let mut ctx = kvrpcpb::Context::default(); - ctx.region_id = self.region.id; - ctx.region_epoch = self.region.region_epoch.clone(); - ctx.peer = Some(l.clone()); - ctx - }) - } - pub fn start_key(&self) -> Key { self.region.start_key.to_vec().into() } diff --git a/src/request/api_version.rs b/src/request/api_version.rs new file mode 100644 index 00000000..98892382 --- /dev/null +++ b/src/request/api_version.rs @@ -0,0 +1,306 @@ +// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. + +use core::panic; +use std::ops::{Bound, Range}; + +use serde_derive::{Deserialize, Serialize}; + +use crate::transaction::Mutation; +use crate::{proto::kvrpcpb, store::Request, Key, Value}; +use crate::{BoundRange, KvPair, Result}; + +use super::{KvRequest, Process}; + +pub const RAW_KEY_PREFIX: u8 = b'r'; +pub const TXN_KEY_PREFIX: u8 = b'x'; +pub const MAX_KEYSPACE_ID: u32 = (1 << 24) - 1; +pub const KEYSPACE_PREFIX_LEN: usize = 4; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum APIVersion { + V1, + V2 { keyspace_id: u32 }, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum KeyMode { + Raw, + Txn, +} + +impl APIVersion { + pub fn as_kvproto(&self) -> kvrpcpb::ApiVersion { + match self { + APIVersion::V1 => kvrpcpb::ApiVersion::V1, + APIVersion::V2 { .. } => kvrpcpb::ApiVersion::V2, + } + } +} + +pub trait EncodeVersion { + fn encode_version(self, api_version: APIVersion, key_mode: KeyMode) -> Self; +} + +pub trait TruncateVersion { + fn truncate_version(self, api_version: APIVersion) -> Self; +} + +// #[derive(Clone, Copy, Debug)] +// pub struct VersionProcessor(APIVersion); + +// impl EncodeVersion for crate::proto::kvrpcpb::RawBatchGetRequest { +// fn encode_version(&mut self, api_version: APIVersion) { +// self.set_api_version(api_version.as_kvproto()); + +// let prefix = match api_version { +// APIVersion::V1 => { +// return; +// } +// APIVersion::V2 { keyspace_id } => keyspace_id.to_be_bytes(), +// }; +// prefix[0] = RAW_KEY_PREFIX; + +// for key in &mut self.keys { +// prepend_bytes(key, &prefix); +// } +// } +// } + +impl EncodeVersion for Key { + fn encode_version(mut self, api_version: APIVersion, key_mode: KeyMode) -> Self { + let mut prefix = match api_version { + APIVersion::V1 => { + return self; + } + APIVersion::V2 { keyspace_id } => keyspace_prefix(keyspace_id, key_mode), + }; + + prepend_bytes(&mut self.0, &prefix); + + self + } +} + +impl EncodeVersion for KvPair { + fn encode_version(mut self, api_version: APIVersion, key_mode: KeyMode) -> Self { + self.0 = self.0.encode_version(api_version, key_mode); + self + } +} + +impl EncodeVersion for BoundRange { + fn encode_version(mut self, api_version: APIVersion, key_mode: KeyMode) -> Self { + self.from = map_bound(self.from, |key| key.encode_version(api_version, key_mode)); + self.to = map_bound(self.to, |key| key.encode_version(api_version, key_mode)); + self + } +} + +impl EncodeVersion for Mutation { + fn encode_version(mut self, api_version: APIVersion, key_mode: KeyMode) -> Self { + match self { + Mutation::Put(key, val) => { + Mutation::Put(key.encode_version(api_version, key_mode), val) + } + Mutation::Delete(key) => Mutation::Delete(key.encode_version(api_version, key_mode)), + } + } +} + +impl TruncateVersion for Key { + fn truncate_version(mut self, api_version: APIVersion) -> Self { + let prefix_len = match api_version { + APIVersion::V1 => { + return self; + } + APIVersion::V2 { keyspace_id: _ } => KEYSPACE_PREFIX_LEN, + }; + + pretruncate_bytes(&mut self.0); + + self + } +} + +impl TruncateVersion for KvPair { + fn truncate_version(mut self, api_version: APIVersion) -> Self { + self.0 = self.0.truncate_version(api_version); + self + } +} + +impl TruncateVersion for BoundRange { + fn truncate_version(mut self, api_version: APIVersion) -> Self { + self.from = map_bound(self.from, |key| key.truncate_version(api_version)); + self.to = map_bound(self.to, |key| key.truncate_version(api_version)); + self + } +} + +impl TruncateVersion for Range { + fn truncate_version(mut self, api_version: APIVersion) -> Self { + self.start = self.start.truncate_version(api_version); + self.end = self.end.truncate_version(api_version); + self + } +} + +impl TruncateVersion for Vec { + fn truncate_version(mut self, api_version: APIVersion) -> Self { + for pair in &mut self { + take_mut::take(pair, |pair| pair.truncate_version(self.api_version)); + } + self + } +} + +// impl EncodeVersion for Vec { +// fn encode_version(mut self, api_version: APIVersion, key_mode: KeyMode) -> Self { +// let mut prefix = match api_version { +// APIVersion::V1 => { +// return self; +// } +// APIVersion::V2 { keyspace_id } => keyspace_prefix(keyspace_id, key_mode), +// }; + +// for key in &mut self { +// prepend_bytes( &mut key.0, &prefix); +// } + +// self +// } +// } + +fn keyspace_prefix(keyspace_id: u32, key_mode: KeyMode) -> [u8; KEYSPACE_PREFIX_LEN] { + let mut prefix = keyspace_id.to_be_bytes(); + prefix[0] = match key_mode { + KeyMode::Raw => RAW_KEY_PREFIX, + KeyMode::Txn => TXN_KEY_PREFIX, + }; + prefix +} + +fn prepend_bytes(vec: &mut Vec, prefix: &[u8; N]) { + unsafe { + vec.reserve_exact(N); + std::ptr::copy(vec.as_ptr(), vec.as_mut_ptr().offset(N as isize), vec.len()); + std::ptr::copy_nonoverlapping(prefix.as_ptr(), vec.as_mut_ptr(), N); + vec.set_len(vec.len() + N); + } +} + +fn pretruncate_bytes(vec: &mut Vec) { + assert!(vec.len() >= N); + unsafe { + std::ptr::copy( + vec.as_ptr().offset(N as isize), + vec.as_mut_ptr(), + vec.len() - N, + ); + vec.set_len(vec.len() - N); + } +} + +// TODO: Remove once std is stabilized. +// see https://github.com/rust-lang/rust/issues/86026 +pub fn map_bound U>(bound: Bound, f: F) -> Bound { + match bound { + Bound::Unbounded => Bound::Unbounded, + Bound::Included(x) => Bound::Included(f(x)), + Bound::Excluded(x) => Bound::Excluded(f(x)), + } +} + +// pub trait APIVersionRequest: KvRequest { +// fn encode_version(&mut self, api_version: APIVersion); +// } + +// #[derive(Clone, Copy, Debug)] +// pub struct APIVersionProcessor(APIVersion); + +// impl APIVersionRequest for crate::proto::kvrpcpb::RawBatchGetRequest { +// fn encode_version(&mut self, api_version: APIVersion) { +// self.set_api_version(api_version); + +// let mut prefix = match api_version { +// APIVersion::V1 => {return;} +// APIVersion::V2 { keyspace_id } => keyspace_id.to_be_bytes(), +// }; +// prefix[0] = RAW_KEY_PREFIX; + +// for key in &mut self.keys { +// prepend_bytes(key, &prefix); +// } +// } + +// // fn decode_respond(resp: &mut Self::Response, api_version: APIVersion) { +// // let prefix_len = match api_version { +// // APIVersion::V1 => {return;} +// // APIVersion::V2 { keyspace_id: _ } => 4, +// // }; + +// // for pair in &mut resp.pairs { +// // pretruncate_bytes(pair.key); +// // } +// // } +// } + +// impl Process for APIVersionProcessor { +// type Out = Option; + +// fn process(&self, input: Result) -> Result { +// let input = input?; +// Ok(if input.not_found { +// None +// } else { +// Some(input.value) +// }) +// } +// } + +// /// Process data into another kind of data. +// pub trait Process: Sized + Clone + Send + Sync + 'static { +// type Out: Send; + +// fn process(&self, input: Result) -> Result; +// } + +// #[derive(Clone)] +// pub struct ProcessResponse> { +// pub inner: P, +// pub processor: Pr, +// } + +// #[async_trait] +// impl> Plan for ProcessResponse { +// type Result = Pr::Out; + +// async fn execute(&self) -> Result { +// self.processor.process(self.inner.execute().await) +// } +// } + +// #[doc(hidden)] +// #[macro_export] +// macro_rules! version_codec { +// ($type_: ty) => { +// impl VersionCodec for $type_ { +// fn encode_request(req: &mut Self, api_version: APIVersion) { +// let keyspace_id = match api_version { +// APIVersion::V1 => {return;} +// APIVersion::V2 { keyspace_id } => keyspace_id, +// }; + +// let prefix = keyspace_id.to_be_bytes(); + +// for key in &mut req.keys { + +// } +// } + +// fn decode_respond(resp: &mut Self::Response, api_version: APIVersion) { + +// } +// } +// }; +// } diff --git a/src/request/mod.rs b/src/request/mod.rs index d2d58ddb..c1581fcf 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -3,6 +3,10 @@ use async_trait::async_trait; use derive_new::new; +pub use self::api_version::APIVersion; +pub use self::api_version::EncodeVersion; +pub use self::api_version::KeyMode; +pub use self::api_version::TruncateVersion; pub use self::plan::Collect; pub use self::plan::CollectError; pub use self::plan::CollectSingle; @@ -33,6 +37,7 @@ use crate::store::Request; use crate::store::{HasKeyErrors, Store}; use crate::transaction::HasLocks; +mod api_version; pub mod plan; mod plan_builder; mod shard; @@ -97,6 +102,7 @@ mod test { use crate::proto::kvrpcpb; use crate::proto::pdpb::Timestamp; use crate::proto::tikvpb::tikv_client::TikvClient; + use crate::region::RegionWithLeader; use crate::store::store_stream_for_keys; use crate::store::HasRegionError; use crate::transaction::lowering::new_commit_request; @@ -142,8 +148,8 @@ mod test { self } - fn set_context(&mut self, _: kvrpcpb::Context) { - unreachable!(); + fn set_leader(&mut self, _: &RegionWithLeader) -> Result<()> { + unreachable!() } fn set_api_version(&mut self, _: kvrpcpb::ApiVersion) { diff --git a/src/request/plan.rs b/src/request/plan.rs index ab72e8aa..9a2a5257 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -35,6 +35,8 @@ use crate::util::iter::FlatMapOkIterExt; use crate::Error; use crate::Result; +use super::api_version::APIVersion; + /// A plan for how to execute a request. A user builds up a plan with various /// options, then exectutes it. #[async_trait] diff --git a/src/request/plan_builder.rs b/src/request/plan_builder.rs index 96f03dee..0087b165 100644 --- a/src/request/plan_builder.rs +++ b/src/request/plan_builder.rs @@ -3,6 +3,7 @@ use std::marker::PhantomData; use std::sync::Arc; +use super::api_version; use super::plan::PreserveShard; use crate::backoff::Backoff; use crate::pd::PdClient; @@ -28,7 +29,7 @@ use crate::store::RegionStore; use crate::transaction::HasLocks; use crate::transaction::ResolveLocksContext; use crate::transaction::ResolveLocksOptions; -use crate::Result; +use crate::{APIVersion, Result}; /// Builder type for plans (see that module for more). pub struct PlanBuilder { @@ -46,7 +47,8 @@ pub struct Targetted; impl PlanBuilderPhase for Targetted {} impl PlanBuilder, NoTarget> { - pub fn new(pd_client: Arc, request: Req) -> Self { + pub fn new(pd_client: Arc, api_version: APIVersion, mut request: Req) -> Self { + request.set_api_version(api_version.as_kvproto()); PlanBuilder { pd_client, plan: Dispatch { @@ -247,8 +249,7 @@ fn set_single_region_store( store: RegionStore, pd_client: Arc, ) -> Result, Targetted>> { - plan.request - .set_context(store.region_with_leader.context()?); + plan.request.set_leader(&store.region_with_leader)?; plan.kv_client = Some(store.client); Ok(PlanBuilder { plan, diff --git a/src/request/shard.rs b/src/request/shard.rs index 428e5d0e..4a74b4a6 100644 --- a/src/request/shard.rs +++ b/src/request/shard.rs @@ -164,7 +164,7 @@ macro_rules! shardable_key { mut shard: Self::Shard, store: &$crate::store::RegionStore, ) -> $crate::Result<()> { - self.context = Some(store.region_with_leader.context()?); + self.set_leader(&store.region_with_leader)?; assert!(shard.len() == 1); self.key = shard.pop().unwrap(); Ok(()) @@ -197,7 +197,7 @@ macro_rules! shardable_keys { shard: Self::Shard, store: &$crate::store::RegionStore, ) -> $crate::Result<()> { - self.context = Some(store.region_with_leader.context()?); + self.set_leader(&store.region_with_leader)?; self.keys = shard.into_iter().map(Into::into).collect(); Ok(()) } @@ -257,7 +257,7 @@ macro_rules! shardable_range { shard: Self::Shard, store: &$crate::store::RegionStore, ) -> $crate::Result<()> { - self.context = Some(store.region_with_leader.context()?); + self.set_leader(&store.region_with_leader)?; // In a reverse range request, the range is in the meaning of [end_key, start_key), i.e. end_key <= x < start_key. // As a result, after obtaining start_key and end_key from PD, we need to swap their values when assigning them to the request. diff --git a/src/store/request.rs b/src/store/request.rs index 5fa15c6d..4aed90d6 100644 --- a/src/store/request.rs +++ b/src/store/request.rs @@ -9,6 +9,7 @@ use tonic::IntoRequest; use crate::proto::kvrpcpb; use crate::proto::tikvpb::tikv_client::TikvClient; +use crate::store::RegionWithLeader; use crate::Error; use crate::Result; @@ -21,7 +22,7 @@ pub trait Request: Any + Sync + Send + 'static { ) -> Result>; fn label(&self) -> &'static str; fn as_any(&self) -> &dyn Any; - fn set_context(&mut self, context: kvrpcpb::Context); + fn set_leader(&mut self, leader: &RegionWithLeader) -> Result<()>; fn set_api_version(&mut self, api_version: kvrpcpb::ApiVersion); } @@ -52,13 +53,20 @@ macro_rules! impl_request { self } - fn set_context(&mut self, context: kvrpcpb::Context) { - self.context = Some(context); + fn set_leader(&mut self, leader: &RegionWithLeader) -> Result<()> { + let ctx = self.context.get_or_insert(kvrpcpb::Context::default()); + let leader_peer = leader.leader.as_ref().ok_or(Error::LeaderNotFound { + region_id: leader.region.id, + })?; + ctx.region_id = leader.region.id; + ctx.region_epoch = leader.region.region_epoch.clone(); + ctx.peer = Some(leader_peer.clone()); + Ok(()) } fn set_api_version(&mut self, api_version: kvrpcpb::ApiVersion) { - let context = self.context.get_or_insert(kvrpcpb::Context::default()); - context.api_version = api_version.into(); + let ctx = self.context.get_or_insert(kvrpcpb::Context::default()); + ctx.api_version = api_version.into(); } } }; diff --git a/src/transaction/buffer.rs b/src/transaction/buffer.rs index 202b3665..7090ebd4 100644 --- a/src/transaction/buffer.rs +++ b/src/transaction/buffer.rs @@ -12,6 +12,8 @@ use crate::KvPair; use crate::Result; use crate::Value; +use super::transaction::Mutation; + /// A caching layer which buffers reads and writes in a transaction. pub struct Buffer { primary_key: Option, @@ -244,12 +246,10 @@ impl Buffer { } } - pub(crate) fn mutate(&mut self, m: kvrpcpb::Mutation) { - let op = kvrpcpb::Op::try_from(m.op).unwrap(); - match op { - kvrpcpb::Op::Put => self.put(m.key.into(), m.value), - kvrpcpb::Op::Del => self.delete(m.key.into()), - _ => unimplemented!("only put and delete are supported in mutate"), + pub(crate) fn mutate(&mut self, m: Mutation) { + match m { + Mutation::Put(key, value) => self.put(key, value), + Mutation::Delete(key) => self.delete(key), }; } diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 1ba0be81..b140c49c 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -11,6 +11,7 @@ use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::pdpb::Timestamp; use crate::request::plan::CleanupLocksResult; +use crate::request::APIVersion; use crate::request::Plan; use crate::timestamp::TimestampExt; use crate::transaction::lock::ResolveLocksOptions; @@ -45,12 +46,14 @@ const SCAN_LOCK_BATCH_SIZE: u32 = 1024; /// awaited to execute. pub struct Client { pd: Arc, + api_version: APIVersion, } impl Clone for Client { fn clone(&self) -> Self { Self { pd: self.pd.clone(), + api_version: self.api_version, } } } @@ -104,7 +107,20 @@ impl Client { debug!("creating new transactional client"); let pd_endpoints: Vec = pd_endpoints.into_iter().map(Into::into).collect(); let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config, true).await?); - Ok(Client { pd }) + Ok(Client { + pd, + api_version: APIVersion::V1, + }) + } + + /// Set the API version to use. + #[must_use] + pub fn with_keyspace(&self, keyspace: String) -> Self { + // FIXME + Client { + pd: self.pd.clone(), + api_version: APIVersion::V2 { keyspace_id: 0 }, + } } /// Creates a new optimistic [`Transaction`]. @@ -290,6 +306,6 @@ impl Client { } fn new_transaction(&self, timestamp: Timestamp, options: TransactionOptions) -> Transaction { - Transaction::new(timestamp, self.pd.clone(), options) + Transaction::new(timestamp, self.pd.clone(), options, self.api_version) } } diff --git a/src/transaction/mod.rs b/src/transaction/mod.rs index 81a290fa..5bc8f0e4 100644 --- a/src/transaction/mod.rs +++ b/src/transaction/mod.rs @@ -15,6 +15,7 @@ pub use snapshot::Snapshot; pub use transaction::CheckLevel; #[doc(hidden)] pub use transaction::HeartbeatOption; +pub use transaction::Mutation; pub use transaction::Transaction; pub use transaction::TransactionOptions; diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index c08025e0..dfada4d7 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -39,6 +39,7 @@ use crate::shardable_keys; use crate::shardable_range; use crate::store::store_stream_for_range; use crate::store::RegionStore; +use crate::store::Request; use crate::store::{store_stream_for_keys, Store}; use crate::timestamp::TimestampExt; use crate::transaction::HasLocks; @@ -297,7 +298,7 @@ impl Shardable for kvrpcpb::PrewriteRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.context = Some(store.region_with_leader.context()?); + self.set_leader(&store.region_with_leader)?; // Only need to set secondary keys if we're sending the primary key. if self.use_async_commit && !self.mutations.iter().any(|m| m.key == self.primary_lock) { @@ -364,7 +365,7 @@ impl Shardable for kvrpcpb::CommitRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.context = Some(store.region_with_leader.context()?); + self.set_leader(&store.region_with_leader)?; self.keys = shard.into_iter().map(Into::into).collect(); Ok(()) } @@ -455,7 +456,7 @@ impl Shardable for kvrpcpb::PessimisticLockRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.context = Some(store.region_with_leader.context()?); + self.set_leader(&store.region_with_leader)?; self.mutations = shard; Ok(()) } @@ -556,7 +557,7 @@ impl Shardable for kvrpcpb::ScanLockRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.context = Some(store.region_with_leader.context()?); + self.set_leader(&store.region_with_leader)?; self.start_key = shard.0; Ok(()) } @@ -617,7 +618,7 @@ impl Shardable for kvrpcpb::TxnHeartBeatRequest { } fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.context = Some(store.region_with_leader.context()?); + self.set_leader(&store.region_with_leader)?; assert!(shard.len() == 1); self.primary_lock = shard.pop().unwrap(); Ok(()) @@ -675,7 +676,7 @@ impl Shardable for kvrpcpb::CheckTxnStatusRequest { } fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.context = Some(store.region_with_leader.context()?); + self.set_leader(&store.region_with_leader)?; assert!(shard.len() == 1); self.primary_key = shard.pop().unwrap(); Ok(()) diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index e0f79170..19a7e0e6 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -19,13 +19,17 @@ use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::kvrpcpb; use crate::proto::pdpb::Timestamp; +use crate::request::APIVersion; use crate::request::Collect; use crate::request::CollectError; use crate::request::CollectSingle; use crate::request::CollectWithShard; +use crate::request::EncodeVersion; +use crate::request::KeyMode; use crate::request::Plan; use crate::request::PlanBuilder; use crate::request::RetryOptions; +use crate::request::TruncateVersion; use crate::timestamp::TimestampExt; use crate::transaction::buffer::Buffer; use crate::transaction::lowering::*; @@ -80,6 +84,7 @@ pub struct Transaction { buffer: Buffer, rpc: Arc, options: TransactionOptions, + api_version: APIVersion, is_heartbeat_started: bool, start_instant: Instant, } @@ -89,6 +94,7 @@ impl Transaction { timestamp: Timestamp, rpc: Arc, options: TransactionOptions, + api_version: APIVersion, ) -> Transaction { let status = if options.read_only { TransactionStatus::ReadOnly @@ -101,6 +107,7 @@ impl Transaction { buffer: Buffer::new(options.is_pessimistic()), rpc, options, + api_version, is_heartbeat_started: false, start_instant: std::time::Instant::now(), } @@ -129,13 +136,14 @@ impl Transaction { self.check_allow_operation().await?; let timestamp = self.timestamp.clone(); let rpc = self.rpc.clone(); - let key = key.into(); + let key = key.into().encode_version(self.api_version, KeyMode::Txn); let retry_options = self.options.retry_options.clone(); + let api_version = self.api_version; self.buffer .get_or_else(key, |key| async move { let request = new_get_request(key, timestamp); - let plan = PlanBuilder::new(rpc, request) + let plan = PlanBuilder::new(rpc, api_version, request) .resolve_lock(retry_options.lock_backoff) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(CollectSingle) @@ -193,7 +201,7 @@ impl Transaction { self.check_allow_operation().await?; if !self.is_pessimistic() { let key = key.into(); - self.lock_keys(iter::once(key.clone())).await?; + self.lock_keys_inner(iter::once(key.clone())).await?; self.get(key).await } else { let mut pairs = self.pessimistic_lock(iter::once(key.into()), true).await?; @@ -260,12 +268,16 @@ impl Transaction { self.check_allow_operation().await?; let timestamp = self.timestamp.clone(); let rpc = self.rpc.clone(); + let api_version = self.api_version; + let keys = keys + .into_iter() + .map(move |k| k.into().encode_version(api_version, KeyMode::Txn)); let retry_options = self.options.retry_options.clone(); self.buffer - .batch_get_or_else(keys.into_iter().map(|k| k.into()), move |keys| async move { + .batch_get_or_else(keys, move |keys| async move { let request = new_batch_get_request(keys, timestamp); - let plan = PlanBuilder::new(rpc, request) + let plan = PlanBuilder::new(rpc, api_version, request) .resolve_lock(retry_options.lock_backoff) .retry_multi_region(retry_options.region_backoff) .merge(Collect) @@ -275,6 +287,7 @@ impl Transaction { .map(|r| r.into_iter().map(Into::into).collect()) }) .await + .map(move |pairs| pairs.map(move |pair| pair.truncate_version(api_version))) } /// Create a new 'batch get for update' request. @@ -312,7 +325,7 @@ impl Transaction { self.check_allow_operation().await?; let keys: Vec = keys.into_iter().map(|k| k.into()).collect(); if !self.is_pessimistic() { - self.lock_keys(keys.clone()).await?; + self.lock_keys_inner(keys.clone()).await?; Ok(self.batch_get(keys).await?.collect()) } else { self.pessimistic_lock(keys, true).await @@ -441,7 +454,7 @@ impl Transaction { pub async fn put(&mut self, key: impl Into, value: impl Into) -> Result<()> { debug!("invoking transactional put request"); self.check_allow_operation().await?; - let key = key.into(); + let key = key.into().encode_version(self.api_version, KeyMode::Txn); if self.is_pessimistic() { self.pessimistic_lock(iter::once(key.clone()), false) .await?; @@ -472,7 +485,7 @@ impl Transaction { pub async fn insert(&mut self, key: impl Into, value: impl Into) -> Result<()> { debug!("invoking transactional insert request"); self.check_allow_operation().await?; - let key = key.into(); + let key = key.into().encode_version(self.api_version, KeyMode::Txn); if self.buffer.get(&key).is_some() { return Err(Error::DuplicateKeyInsertion); } @@ -507,7 +520,7 @@ impl Transaction { pub async fn delete(&mut self, key: impl Into) -> Result<()> { debug!("invoking transactional delete request"); self.check_allow_operation().await?; - let key = key.into(); + let key = key.into().encode_version(self.api_version, KeyMode::Txn); if self.is_pessimistic() { self.pessimistic_lock(iter::once(key.clone()), false) .await?; @@ -547,13 +560,16 @@ impl Transaction { /// ``` pub async fn batch_mutate( &mut self, - mutations: impl IntoIterator, + mutations: impl IntoIterator, ) -> Result<()> { debug!("invoking transactional batch mutate request"); self.check_allow_operation().await?; if self.is_pessimistic() { - let mutations: Vec = mutations.into_iter().collect(); - self.pessimistic_lock(mutations.iter().map(|m| Key::from(m.key.clone())), false) + let mutations: Vec = mutations + .into_iter() + .map(|mutation| mutation.encode_version(self.api_version, KeyMode::Txn)) + .collect(); + self.pessimistic_lock(mutations.iter().map(|m| Key::from(m.key().clone())), false) .await?; for m in mutations { self.buffer.mutate(m); @@ -592,6 +608,18 @@ impl Transaction { pub async fn lock_keys( &mut self, keys: impl IntoIterator>, + ) -> Result<()> { + let api_version = self.api_version; + let keys = keys + .into_iter() + .map(move |k| k.into().encode_version(api_version, KeyMode::Txn)); + self.lock_keys_inner(keys).await + } + + /// Lock key with keys already encoded version. + async fn lock_keys_inner( + &mut self, + keys: impl IntoIterator>, ) -> Result<()> { debug!("invoking transactional lock_keys request"); self.check_allow_operation().await?; @@ -653,6 +681,7 @@ impl Transaction { self.timestamp.clone(), self.rpc.clone(), self.options.clone(), + self.api_version, self.buffer.get_write_size() as u64, self.start_instant, ) @@ -705,6 +734,7 @@ impl Transaction { self.timestamp.clone(), self.rpc.clone(), self.options.clone(), + self.api_version, self.buffer.get_write_size() as u64, self.start_instant, ) @@ -738,7 +768,7 @@ impl Transaction { primary_key, self.start_instant.elapsed().as_millis() as u64 + MAX_TTL, ); - let plan = PlanBuilder::new(self.rpc.clone(), request) + let plan = PlanBuilder::new(self.rpc.clone(), self.api_version, request) .resolve_lock(self.options.retry_options.lock_backoff.clone()) .retry_multi_region(self.options.retry_options.region_backoff.clone()) .merge(CollectSingle) @@ -758,17 +788,19 @@ impl Transaction { let timestamp = self.timestamp.clone(); let rpc = self.rpc.clone(); let retry_options = self.options.retry_options.clone(); + let api_version = self.api_version; + let range = range.into().encode_version(self.api_version, KeyMode::Raw); self.buffer .scan_and_fetch( - range.into(), + range, limit, !key_only, reverse, move |new_range, new_limit| async move { let request = new_scan_request(new_range, timestamp, new_limit, key_only, reverse); - let plan = PlanBuilder::new(rpc, request) + let plan = PlanBuilder::new(rpc, api_version, request) .resolve_lock(retry_options.lock_backoff) .retry_multi_region(retry_options.region_backoff) .merge(Collect) @@ -778,7 +810,7 @@ impl Transaction { .map(|r| r.into_iter().map(Into::into).collect()) }, ) - .await + .await.map(move |pairs| pairs.map(move |pair| pair.truncate_version(api_version))) } /// Pessimistically lock the keys, and optionally retrieve corresponding values. @@ -823,7 +855,7 @@ impl Transaction { for_update_ts.clone(), need_value, ); - let plan = PlanBuilder::new(self.rpc.clone(), request) + let plan = PlanBuilder::new(self.rpc.clone(), self.api_version, request) .resolve_lock(self.options.retry_options.lock_backoff.clone()) .preserve_shard() .retry_multi_region_preserve_results(self.options.retry_options.region_backoff.clone()) @@ -877,7 +909,7 @@ impl Transaction { start_version, for_update_ts, ); - let plan = PlanBuilder::new(self.rpc.clone(), req) + let plan = PlanBuilder::new(self.rpc.clone(),self.api_version, req) .resolve_lock(self.options.retry_options.lock_backoff.clone()) .retry_multi_region(self.options.retry_options.region_backoff.clone()) .extract_error() @@ -926,6 +958,7 @@ impl Transaction { HeartbeatOption::FixedTime(heartbeat_interval) => heartbeat_interval, }; let start_instant = self.start_instant; + let api_version = self.api_version; let heartbeat_task = async move { loop { @@ -946,7 +979,7 @@ impl Transaction { primary_key.clone(), start_instant.elapsed().as_millis() as u64 + MAX_TTL, ); - let plan = PlanBuilder::new(rpc.clone(), request) + let plan = PlanBuilder::new(rpc.clone(), api_version, request) .retry_multi_region(region_backoff.clone()) .merge(CollectSingle) .plan(); @@ -1192,6 +1225,21 @@ impl HeartbeatOption { } } +#[derive(Clone, Eq, PartialEq, Debug)] +pub enum Mutation { + Put(Key, Value), + Delete(Key), +} + +impl Mutation { + pub fn key(&self) -> &Key { + match self { + Mutation::Put(key, _) => key, + Mutation::Delete(key) => key, + } + } +} + /// A struct wrapping the details of two-phase commit protocol (2PC). /// /// The two phases are `prewrite` and `commit`. @@ -1207,6 +1255,7 @@ struct Committer { start_version: Timestamp, rpc: Arc, options: TransactionOptions, + api_version: APIVersion, #[new(default)] undetermined: bool, write_size: u64, @@ -1284,7 +1333,7 @@ impl Committer { .collect(); // FIXME set max_commit_ts and min_commit_ts - let plan = PlanBuilder::new(self.rpc.clone(), request) + let plan = PlanBuilder::new(self.rpc.clone(), self.api_version, request) .resolve_lock(self.options.retry_options.lock_backoff.clone()) .retry_multi_region(self.options.retry_options.region_backoff.clone()) .merge(CollectError) @@ -1324,7 +1373,7 @@ impl Committer { self.start_version.clone(), commit_version.clone(), ); - let plan = PlanBuilder::new(self.rpc.clone(), req) + let plan = PlanBuilder::new(self.rpc.clone(), self.api_version, req) .resolve_lock(self.options.retry_options.lock_backoff.clone()) .retry_multi_region(self.options.retry_options.region_backoff.clone()) .extract_error() @@ -1388,7 +1437,7 @@ impl Committer { .filter(|key| &primary_key != key); new_commit_request(keys, self.start_version, commit_version) }; - let plan = PlanBuilder::new(self.rpc, req) + let plan = PlanBuilder::new(self.rpc, self.api_version, req) .resolve_lock(self.options.retry_options.lock_backoff) .retry_multi_region(self.options.retry_options.region_backoff) .extract_error() @@ -1409,7 +1458,7 @@ impl Committer { match self.options.kind { TransactionKind::Optimistic => { let req = new_batch_rollback_request(keys, self.start_version); - let plan = PlanBuilder::new(self.rpc, req) + let plan = PlanBuilder::new(self.rpc, self.api_version, req) .resolve_lock(self.options.retry_options.lock_backoff) .retry_multi_region(self.options.retry_options.region_backoff) .extract_error() @@ -1418,7 +1467,7 @@ impl Committer { } TransactionKind::Pessimistic(for_update_ts) => { let req = new_pessimistic_rollback_request(keys, self.start_version, for_update_ts); - let plan = PlanBuilder::new(self.rpc, req) + let plan = PlanBuilder::new(self.rpc, self.api_version, req) .resolve_lock(self.options.retry_options.lock_backoff) .retry_multi_region(self.options.retry_options.region_backoff) .extract_error()