diff --git a/src/raw/client.rs b/src/raw/client.rs index 72c0278a..4735f649 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -15,13 +15,13 @@ use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::metapb; use crate::raw::lowering::*; -use crate::request::APIVersion; use crate::request::Collect; use crate::request::CollectSingle; -use crate::request::EncodeVersion; +use crate::request::EncodeKeyspace; use crate::request::KeyMode; +use crate::request::Keyspace; use crate::request::Plan; -use crate::request::TruncateVersion; +use crate::request::TruncateKeyspace; use crate::Backoff; use crate::BoundRange; use crate::ColumnFamily; @@ -45,7 +45,7 @@ pub struct Client { backoff: Backoff, /// Whether to use the [`atomic mode`](Client::with_atomic_for_cas). atomic: bool, - api_version: APIVersion, + keyspace: Keyspace, } impl Clone for Client { @@ -55,7 +55,7 @@ impl Clone for Client { cf: self.cf.clone(), backoff: self.backoff.clone(), atomic: self.atomic, - api_version: self.api_version, + keyspace: self.keyspace, } } } @@ -109,19 +109,19 @@ impl Client { let pd_endpoints: Vec = pd_endpoints.into_iter().map(Into::into).collect(); let rpc = Arc::new(PdRpcClient::connect(&pd_endpoints, config.clone(), enable_codec).await?); - let api_version = match config.keyspace { + let keyspace = match config.keyspace { Some(keyspace) => { let keyspace_id = rpc.get_keyspace_id(&keyspace).await?; - APIVersion::V2 { keyspace_id } + Keyspace::Enable { keyspace_id } } - None => APIVersion::V1, + None => Keyspace::Disable, }; Ok(Client { rpc, cf: None, backoff: DEFAULT_REGION_BACKOFF, atomic: false, - api_version, + keyspace, }) } @@ -156,7 +156,7 @@ impl Client { cf: Some(cf), backoff: self.backoff.clone(), atomic: self.atomic, - api_version: self.api_version, + keyspace: self.keyspace, } } @@ -185,7 +185,7 @@ impl Client { cf: self.cf.clone(), backoff, atomic: self.atomic, - api_version: self.api_version, + keyspace: self.keyspace, } } @@ -203,7 +203,7 @@ impl Client { cf: self.cf.clone(), backoff: self.backoff.clone(), atomic: true, - api_version: self.api_version, + keyspace: self.keyspace, } } } @@ -229,9 +229,9 @@ impl Client { /// ``` pub async fn get(&self, key: impl Into) -> Result> { debug!("invoking raw get request"); - let key = key.into().encode_version(self.api_version, KeyMode::Raw); + let key = key.into().encode_keyspace(self.keyspace, KeyMode::Raw); let request = new_raw_get_request(key, self.cf.clone()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request) + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request) .retry_multi_region(self.backoff.clone()) .merge(CollectSingle) .post_process_default() @@ -264,15 +264,15 @@ impl Client { debug!("invoking raw batch_get request"); let keys = keys .into_iter() - .map(|k| k.into().encode_version(self.api_version, KeyMode::Raw)); + .map(|k| k.into().encode_keyspace(self.keyspace, KeyMode::Raw)); let request = new_raw_batch_get_request(keys, self.cf.clone()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request) + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request) .retry_multi_region(self.backoff.clone()) .merge(Collect) .plan(); plan.execute().await.map(|r| { r.into_iter() - .map(|pair| pair.truncate_version(self.api_version)) + .map(|pair| pair.truncate_keyspace(self.keyspace)) .collect() }) } @@ -295,9 +295,9 @@ impl Client { /// ``` pub async fn put(&self, key: impl Into, value: impl Into) -> Result<()> { debug!("invoking raw put request"); - let key = key.into().encode_version(self.api_version, KeyMode::Raw); + let key = key.into().encode_keyspace(self.keyspace, KeyMode::Raw); let request = new_raw_put_request(key, value.into(), self.cf.clone(), self.atomic); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request) + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request) .retry_multi_region(self.backoff.clone()) .merge(CollectSingle) .extract_error() @@ -330,9 +330,9 @@ impl Client { debug!("invoking raw batch_put request"); let pairs = pairs .into_iter() - .map(|pair| pair.into().encode_version(self.api_version, KeyMode::Raw)); + .map(|pair| pair.into().encode_keyspace(self.keyspace, KeyMode::Raw)); let request = new_raw_batch_put_request(pairs, self.cf.clone(), self.atomic); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request) + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request) .retry_multi_region(self.backoff.clone()) .extract_error() .plan(); @@ -359,9 +359,9 @@ impl Client { /// ``` pub async fn delete(&self, key: impl Into) -> Result<()> { debug!("invoking raw delete request"); - let key = key.into().encode_version(self.api_version, KeyMode::Raw); + let key = key.into().encode_keyspace(self.keyspace, KeyMode::Raw); let request = new_raw_delete_request(key, self.cf.clone(), self.atomic); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request) + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request) .retry_multi_region(self.backoff.clone()) .merge(CollectSingle) .extract_error() @@ -392,9 +392,9 @@ impl Client { self.assert_non_atomic()?; let keys = keys .into_iter() - .map(|k| k.into().encode_version(self.api_version, KeyMode::Raw)); + .map(|k| k.into().encode_keyspace(self.keyspace, KeyMode::Raw)); let request = new_raw_batch_delete_request(keys, self.cf.clone()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request) + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request) .retry_multi_region(self.backoff.clone()) .extract_error() .plan(); @@ -420,9 +420,9 @@ impl Client { pub async fn delete_range(&self, range: impl Into) -> Result<()> { debug!("invoking raw delete_range request"); self.assert_non_atomic()?; - let range = range.into().encode_version(self.api_version, KeyMode::Raw); + let range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw); let request = new_raw_delete_range_request(range, self.cf.clone()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request) + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request) .retry_multi_region(self.backoff.clone()) .extract_error() .plan(); @@ -572,14 +572,14 @@ impl Client { ) -> Result<(Option, bool)> { debug!("invoking raw compare_and_swap request"); self.assert_atomic()?; - let key = key.into().encode_version(self.api_version, KeyMode::Raw); + let key = key.into().encode_keyspace(self.keyspace, KeyMode::Raw); let req = new_cas_request( key, new_value.into(), previous_value.into(), self.cf.clone(), ); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, req) + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, req) .retry_multi_region(self.backoff.clone()) .merge(CollectSingle) .post_process_default() @@ -598,14 +598,14 @@ impl Client { semver::VersionReq::from_str(&copr_version_req)?; let ranges = ranges .into_iter() - .map(|range| range.into().encode_version(self.api_version, KeyMode::Raw)); - let api_version = self.api_version; + .map(|range| range.into().encode_keyspace(self.keyspace, KeyMode::Raw)); + let keyspace = self.keyspace; let request_builder = move |region, ranges: Vec>| { request_builder( region, ranges .into_iter() - .map(|range| range.truncate_version(api_version)) + .map(|range| range.truncate_keyspace(keyspace)) .collect(), ) }; @@ -615,7 +615,7 @@ impl Client { ranges, request_builder, ); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, req) + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, req) .preserve_shard() .retry_multi_region(self.backoff.clone()) .post_process_default() @@ -624,7 +624,7 @@ impl Client { .execute() .await? .into_iter() - .map(|(ranges, data)| (ranges.truncate_version(api_version), data)) + .map(|(ranges, data)| (ranges.truncate_keyspace(keyspace), data)) .collect()) } @@ -641,7 +641,7 @@ impl Client { }); } - let mut cur_range = range.into().encode_version(self.api_version, KeyMode::Raw); + let mut cur_range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw); let mut result = Vec::new(); let mut scan_regions = self.rpc.clone().stores_for_range(cur_range.clone()).boxed(); let mut region_store = @@ -656,13 +656,12 @@ impl Client { while cur_limit > 0 { let request = new_raw_scan_request(cur_range.clone(), cur_limit, key_only, self.cf.clone()); - let resp = - crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request) - .single_region_with_store(region_store.clone()) - .await? - .plan() - .execute() - .await?; + let resp = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request) + .single_region_with_store(region_store.clone()) + .await? + .plan() + .execute() + .await?; let mut region_scan_res = resp .kvs .into_iter() @@ -694,7 +693,7 @@ impl Client { result.truncate(limit as usize); // truncate the version of keys - let result = result.truncate_version(self.api_version); + let result = result.truncate_keyspace(self.keyspace); Ok(result) } @@ -714,16 +713,16 @@ impl Client { let ranges = ranges .into_iter() - .map(|range| range.into().encode_version(self.api_version, KeyMode::Raw)); + .map(|range| range.into().encode_keyspace(self.keyspace, KeyMode::Raw)); let request = new_raw_batch_scan_request(ranges, each_limit, key_only, self.cf.clone()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.api_version, request) + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request) .retry_multi_region(self.backoff.clone()) .merge(Collect) .plan(); plan.execute().await.map(|r| { r.into_iter() - .map(|pair| pair.truncate_version(self.api_version)) + .map(|pair| pair.truncate_keyspace(self.keyspace)) .collect() }) } @@ -778,7 +777,7 @@ mod tests { cf: Some(ColumnFamily::Default), backoff: DEFAULT_REGION_BACKOFF, atomic: false, - api_version: APIVersion::V2 { keyspace_id: 0 }, + keyspace: Keyspace::Enable { keyspace_id: 0 }, }; let resps = client .coprocessor( diff --git a/src/raw/requests.rs b/src/raw/requests.rs index 568b69a4..2d2677ee 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -503,15 +503,15 @@ mod test { use crate::mock::MockKvClient; use crate::mock::MockPdClient; use crate::proto::kvrpcpb; - use crate::request::APIVersion; + use crate::request::Keyspace; use crate::request::Plan; use crate::Key; #[rstest::rstest] - #[case(APIVersion::V1)] - #[case(APIVersion::V2 { keyspace_id: 0 })] + #[case(Keyspace::Disable)] + #[case(Keyspace::Enable { keyspace_id: 0 })] #[tokio::test] - async fn test_raw_scan(#[case] api_version: APIVersion) { + async fn test_raw_scan(#[case] keyspace: Keyspace) { let client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook( |req: &dyn Any| { let req: &kvrpcpb::RawScanRequest = req.downcast_ref().unwrap(); @@ -540,8 +540,8 @@ mod test { key_only: true, ..Default::default() }; - let plan = crate::request::PlanBuilder::new(client, api_version, scan) - .resolve_lock(OPTIMISTIC_BACKOFF, api_version) + let plan = crate::request::PlanBuilder::new(client, keyspace, scan) + .resolve_lock(OPTIMISTIC_BACKOFF, keyspace) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(Collect) .plan(); diff --git a/src/request/api_version.rs b/src/request/keyspace.rs similarity index 58% rename from src/request/api_version.rs rename to src/request/keyspace.rs index 76022241..7b8d4bba 100644 --- a/src/request/api_version.rs +++ b/src/request/keyspace.rs @@ -13,9 +13,9 @@ pub const TXN_KEY_PREFIX: u8 = b'x'; pub const KEYSPACE_PREFIX_LEN: usize = 4; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] -pub enum APIVersion { - V1, - V2 { keyspace_id: u32 }, +pub enum Keyspace { + Disable, + Enable { keyspace_id: u32 }, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] @@ -24,30 +24,30 @@ pub enum KeyMode { Txn, } -impl APIVersion { - pub fn as_kvproto(&self) -> kvrpcpb::ApiVersion { +impl Keyspace { + pub fn api_version(&self) -> kvrpcpb::ApiVersion { match self { - APIVersion::V1 => kvrpcpb::ApiVersion::V1, - APIVersion::V2 { .. } => kvrpcpb::ApiVersion::V2, + Keyspace::Disable => kvrpcpb::ApiVersion::V1, + Keyspace::Enable { .. } => kvrpcpb::ApiVersion::V2, } } } -pub trait EncodeVersion { - fn encode_version(self, api_version: APIVersion, key_mode: KeyMode) -> Self; +pub trait EncodeKeyspace { + fn encode_keyspace(self, keyspace: Keyspace, key_mode: KeyMode) -> Self; } -pub trait TruncateVersion { - fn truncate_version(self, api_version: APIVersion) -> Self; +pub trait TruncateKeyspace { + fn truncate_keyspace(self, keyspace: Keyspace) -> Self; } -impl EncodeVersion for Key { - fn encode_version(mut self, api_version: APIVersion, key_mode: KeyMode) -> Self { - let prefix = match api_version { - APIVersion::V1 => { +impl EncodeKeyspace for Key { + fn encode_keyspace(mut self, keyspace: Keyspace, key_mode: KeyMode) -> Self { + let prefix = match keyspace { + Keyspace::Disable => { return self; } - APIVersion::V2 { keyspace_id } => keyspace_prefix(keyspace_id, key_mode), + Keyspace::Enable { keyspace_id } => keyspace_prefix(keyspace_id, key_mode), }; prepend_bytes(&mut self.0, &prefix); @@ -56,59 +56,57 @@ impl EncodeVersion for Key { } } -impl EncodeVersion for KvPair { - fn encode_version(mut self, api_version: APIVersion, key_mode: KeyMode) -> Self { - self.0 = self.0.encode_version(api_version, key_mode); +impl EncodeKeyspace for KvPair { + fn encode_keyspace(mut self, keyspace: Keyspace, key_mode: KeyMode) -> Self { + self.0 = self.0.encode_keyspace(keyspace, key_mode); self } } -impl EncodeVersion for BoundRange { - fn encode_version(mut self, api_version: APIVersion, key_mode: KeyMode) -> Self { +impl EncodeKeyspace for BoundRange { + fn encode_keyspace(mut self, keyspace: Keyspace, key_mode: KeyMode) -> Self { self.from = match self.from { - Bound::Included(key) => Bound::Included(key.encode_version(api_version, key_mode)), - Bound::Excluded(key) => Bound::Excluded(key.encode_version(api_version, key_mode)), + Bound::Included(key) => Bound::Included(key.encode_keyspace(keyspace, key_mode)), + Bound::Excluded(key) => Bound::Excluded(key.encode_keyspace(keyspace, key_mode)), Bound::Unbounded => { let key = Key::from(vec![]); - Bound::Included(key.encode_version(api_version, key_mode)) + Bound::Included(key.encode_keyspace(keyspace, key_mode)) } }; self.to = match self.to { Bound::Included(key) if !key.is_empty() => { - Bound::Included(key.encode_version(api_version, key_mode)) + Bound::Included(key.encode_keyspace(keyspace, key_mode)) } Bound::Excluded(key) if !key.is_empty() => { - Bound::Excluded(key.encode_version(api_version, key_mode)) + Bound::Excluded(key.encode_keyspace(keyspace, key_mode)) } _ => { let key = Key::from(vec![]); - let api_version = match api_version { - APIVersion::V1 => APIVersion::V1, - APIVersion::V2 { keyspace_id } => APIVersion::V2 { + let keyspace = match keyspace { + Keyspace::Disable => Keyspace::Disable, + Keyspace::Enable { keyspace_id } => Keyspace::Enable { keyspace_id: keyspace_id + 1, }, }; - Bound::Excluded(key.encode_version(api_version, key_mode)) + Bound::Excluded(key.encode_keyspace(keyspace, key_mode)) } }; self } } -impl EncodeVersion for Mutation { - fn encode_version(self, api_version: APIVersion, key_mode: KeyMode) -> Self { +impl EncodeKeyspace for Mutation { + fn encode_keyspace(self, keyspace: Keyspace, key_mode: KeyMode) -> Self { match self { - Mutation::Put(key, val) => { - Mutation::Put(key.encode_version(api_version, key_mode), val) - } - Mutation::Delete(key) => Mutation::Delete(key.encode_version(api_version, key_mode)), + Mutation::Put(key, val) => Mutation::Put(key.encode_keyspace(keyspace, key_mode), val), + Mutation::Delete(key) => Mutation::Delete(key.encode_keyspace(keyspace, key_mode)), } } } -impl TruncateVersion for Key { - fn truncate_version(mut self, api_version: APIVersion) -> Self { - if let APIVersion::V1 = api_version { +impl TruncateKeyspace for Key { + fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self { + if let Keyspace::Disable = keyspace { return self; } @@ -118,34 +116,34 @@ impl TruncateVersion for Key { } } -impl TruncateVersion for KvPair { - fn truncate_version(mut self, api_version: APIVersion) -> Self { - self.0 = self.0.truncate_version(api_version); +impl TruncateKeyspace for KvPair { + fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self { + self.0 = self.0.truncate_keyspace(keyspace); self } } -impl TruncateVersion for Range { - fn truncate_version(mut self, api_version: APIVersion) -> Self { - self.start = self.start.truncate_version(api_version); - self.end = self.end.truncate_version(api_version); +impl TruncateKeyspace for Range { + fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self { + self.start = self.start.truncate_keyspace(keyspace); + self.end = self.end.truncate_keyspace(keyspace); self } } -impl TruncateVersion for Vec> { - fn truncate_version(mut self, api_version: APIVersion) -> Self { +impl TruncateKeyspace for Vec> { + fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self { for range in &mut self { - take_mut::take(range, |range| range.truncate_version(api_version)); + take_mut::take(range, |range| range.truncate_keyspace(keyspace)); } self } } -impl TruncateVersion for Vec { - fn truncate_version(mut self, api_version: APIVersion) -> Self { +impl TruncateKeyspace for Vec { + fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self { for pair in &mut self { - take_mut::take(pair, |pair| pair.truncate_version(api_version)); + take_mut::take(pair, |pair| pair.truncate_keyspace(keyspace)); } self } @@ -196,35 +194,35 @@ mod tests { #[test] fn test_encode_version() { - let api_version = APIVersion::V2 { + let keyspace = Keyspace::Enable { keyspace_id: 0xDEAD, }; let key_mode = KeyMode::Raw; let key = Key::from(vec![0xBE, 0xEF]); let expected_key = Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF]); - assert_eq!(key.encode_version(api_version, key_mode), expected_key); + assert_eq!(key.encode_keyspace(keyspace, key_mode), expected_key); let bound: BoundRange = (Key::from(vec![0xDE, 0xAD])..Key::from(vec![0xBE, 0xEF])).into(); let expected_bound: BoundRange = (Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xDE, 0xAD]) ..Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF])) .into(); - assert_eq!(bound.encode_version(api_version, key_mode), expected_bound); + assert_eq!(bound.encode_keyspace(keyspace, key_mode), expected_bound); let bound: BoundRange = (..).into(); let expected_bound: BoundRange = (Key::from(vec![b'r', 0, 0xDE, 0xAD])..Key::from(vec![b'r', 0, 0xDE, 0xAE])).into(); - assert_eq!(bound.encode_version(api_version, key_mode), expected_bound); + assert_eq!(bound.encode_keyspace(keyspace, key_mode), expected_bound); let bound: BoundRange = (Key::from(vec![])..Key::from(vec![])).into(); let expected_bound: BoundRange = (Key::from(vec![b'r', 0, 0xDE, 0xAD])..Key::from(vec![b'r', 0, 0xDE, 0xAE])).into(); - assert_eq!(bound.encode_version(api_version, key_mode), expected_bound); + assert_eq!(bound.encode_keyspace(keyspace, key_mode), expected_bound); let bound: BoundRange = (Key::from(vec![])..=Key::from(vec![])).into(); let expected_bound: BoundRange = (Key::from(vec![b'r', 0, 0xDE, 0xAD])..Key::from(vec![b'r', 0, 0xDE, 0xAE])).into(); - assert_eq!(bound.encode_version(api_version, key_mode), expected_bound); + assert_eq!(bound.encode_keyspace(keyspace, key_mode), expected_bound); let mutation = Mutation::Put(Key::from(vec![0xBE, 0xEF]), vec![4, 5, 6]); let expected_mutation = Mutation::Put( @@ -232,14 +230,14 @@ mod tests { vec![4, 5, 6], ); assert_eq!( - mutation.encode_version(api_version, key_mode), + mutation.encode_keyspace(keyspace, key_mode), expected_mutation ); let mutation = Mutation::Delete(Key::from(vec![0xBE, 0xEF])); let expected_mutation = Mutation::Delete(Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF])); assert_eq!( - mutation.encode_version(api_version, key_mode), + mutation.encode_keyspace(keyspace, key_mode), expected_mutation ); } @@ -247,17 +245,17 @@ mod tests { #[test] fn test_truncate_version() { let key = Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF]); - let api_version = APIVersion::V2 { + let keyspace = Keyspace::Enable { keyspace_id: 0xDEAD, }; let expected_key = Key::from(vec![0xBE, 0xEF]); - assert_eq!(key.truncate_version(api_version), expected_key); + assert_eq!(key.truncate_keyspace(keyspace), expected_key); let key = Key::from(vec![b'x', 0, 0xDE, 0xAD, 0xBE, 0xEF]); - let api_version = APIVersion::V2 { + let keyspace = Keyspace::Enable { keyspace_id: 0xDEAD, }; let expected_key = Key::from(vec![0xBE, 0xEF]); - assert_eq!(key.truncate_version(api_version), expected_key); + assert_eq!(key.truncate_keyspace(keyspace), expected_key); } } diff --git a/src/request/mod.rs b/src/request/mod.rs index 7a7bde1e..14de8e90 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -3,10 +3,10 @@ use async_trait::async_trait; use derive_new::new; -pub use self::api_version::APIVersion; -pub use self::api_version::EncodeVersion; -pub use self::api_version::KeyMode; -pub use self::api_version::TruncateVersion; +pub use self::keyspace::EncodeKeyspace; +pub use self::keyspace::KeyMode; +pub use self::keyspace::Keyspace; +pub use self::keyspace::TruncateKeyspace; pub use self::plan::Collect; pub use self::plan::CollectError; pub use self::plan::CollectSingle; @@ -37,7 +37,7 @@ use crate::store::Request; use crate::store::{HasKeyErrors, Store}; use crate::transaction::HasLocks; -mod api_version; +mod keyspace; pub mod plan; mod plan_builder; mod shard; @@ -198,8 +198,8 @@ mod test { |_: &dyn Any| Ok(Box::new(MockRpcResponse) as Box), ))); - let plan = crate::request::PlanBuilder::new(pd_client.clone(), APIVersion::V1, request) - .resolve_lock(Backoff::no_jitter_backoff(1, 1, 3), APIVersion::V1) + let plan = crate::request::PlanBuilder::new(pd_client.clone(), Keyspace::Disable, request) + .resolve_lock(Backoff::no_jitter_backoff(1, 1, 3), Keyspace::Disable) .retry_multi_region(Backoff::no_jitter_backoff(1, 1, 3)) .extract_error() .plan(); @@ -224,15 +224,16 @@ mod test { let req = new_commit_request(iter::once(key), Timestamp::default(), Timestamp::default()); // does not extract error - let plan = crate::request::PlanBuilder::new(pd_client.clone(), APIVersion::V1, req.clone()) - .resolve_lock(OPTIMISTIC_BACKOFF, APIVersion::V1) - .retry_multi_region(OPTIMISTIC_BACKOFF) - .plan(); + let plan = + crate::request::PlanBuilder::new(pd_client.clone(), Keyspace::Disable, req.clone()) + .resolve_lock(OPTIMISTIC_BACKOFF, Keyspace::Disable) + .retry_multi_region(OPTIMISTIC_BACKOFF) + .plan(); assert!(plan.execute().await.is_ok()); // extract error - let plan = crate::request::PlanBuilder::new(pd_client.clone(), APIVersion::V1, req) - .resolve_lock(OPTIMISTIC_BACKOFF, APIVersion::V1) + let plan = crate::request::PlanBuilder::new(pd_client.clone(), Keyspace::Disable, req) + .resolve_lock(OPTIMISTIC_BACKOFF, Keyspace::Disable) .retry_multi_region(OPTIMISTIC_BACKOFF) .extract_error() .plan(); diff --git a/src/request/plan.rs b/src/request/plan.rs index bf8b04dd..06acbe47 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -35,7 +35,7 @@ use crate::util::iter::FlatMapOkIterExt; use crate::Error; use crate::Result; -use super::api_version::APIVersion; +use super::keyspace::Keyspace; /// A plan for how to execute a request. A user builds up a plan with various /// options, then exectutes it. @@ -548,7 +548,7 @@ pub struct ResolveLock { pub inner: P, pub pd_client: Arc, pub backoff: Backoff, - pub api_version: APIVersion, + pub keyspace: Keyspace, } impl Clone for ResolveLock { @@ -557,7 +557,7 @@ impl Clone for ResolveLock { inner: self.inner.clone(), pd_client: self.pd_client.clone(), backoff: self.backoff.clone(), - api_version: self.api_version, + keyspace: self.keyspace, } } } @@ -583,7 +583,7 @@ where } let pd_client = self.pd_client.clone(); - let live_locks = resolve_locks(locks, pd_client.clone(), self.api_version).await?; + let live_locks = resolve_locks(locks, pd_client.clone(), self.keyspace).await?; if live_locks.is_empty() { result = self.inner.execute().await?; } else { @@ -648,7 +648,7 @@ pub struct CleanupLocks { pub options: ResolveLocksOptions, pub store: Option, pub pd_client: Arc, - pub api_version: APIVersion, + pub keyspace: Keyspace, pub backoff: Backoff, } @@ -660,7 +660,7 @@ impl Clone for CleanupLocks { options: self.options, store: None, pd_client: self.pd_client.clone(), - api_version: self.api_version, + keyspace: self.keyspace, backoff: self.backoff.clone(), } } @@ -725,7 +725,7 @@ where self.store.clone().unwrap(), locks, self.pd_client.clone(), - self.api_version, + self.keyspace, ) .await { @@ -902,7 +902,7 @@ mod test { inner: ErrPlan, backoff: Backoff::no_backoff(), pd_client: Arc::new(MockPdClient::default()), - api_version: APIVersion::V1, + keyspace: Keyspace::Disable, }, pd_client: Arc::new(MockPdClient::default()), backoff: Backoff::no_backoff(), diff --git a/src/request/plan_builder.rs b/src/request/plan_builder.rs index 83218efc..c117d14b 100644 --- a/src/request/plan_builder.rs +++ b/src/request/plan_builder.rs @@ -4,7 +4,7 @@ use std::marker::PhantomData; use std::sync::Arc; use super::plan::PreserveShard; -use super::APIVersion; +use super::Keyspace; use crate::backoff::Backoff; use crate::pd::PdClient; use crate::request::plan::{CleanupLocks, RetryableAllStores}; @@ -47,8 +47,8 @@ pub struct Targetted; impl PlanBuilderPhase for Targetted {} impl PlanBuilder, NoTarget> { - pub fn new(pd_client: Arc, api_version: APIVersion, mut request: Req) -> Self { - request.set_api_version(api_version.as_kvproto()); + pub fn new(pd_client: Arc, keyspace: Keyspace, mut request: Req) -> Self { + request.set_api_version(keyspace.api_version()); PlanBuilder { pd_client, plan: Dispatch { @@ -73,7 +73,7 @@ impl PlanBuilder { pub fn resolve_lock( self, backoff: Backoff, - api_version: APIVersion, + keyspace: Keyspace, ) -> PlanBuilder, Ph> where P::Result: HasLocks, @@ -84,7 +84,7 @@ impl PlanBuilder { inner: self.plan, backoff, pd_client: self.pd_client, - api_version, + keyspace, }, phantom: PhantomData, } @@ -95,7 +95,7 @@ impl PlanBuilder { ctx: ResolveLocksContext, options: ResolveLocksOptions, backoff: Backoff, - api_version: APIVersion, + keyspace: Keyspace, ) -> PlanBuilder, Ph> where P: Shardable + NextBatch, @@ -110,7 +110,7 @@ impl PlanBuilder { store: None, backoff, pd_client: self.pd_client, - api_version, + keyspace, }, phantom: PhantomData, } diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 0b94c697..384551fe 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -11,9 +11,9 @@ use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::pdpb::Timestamp; use crate::request::plan::CleanupLocksResult; -use crate::request::APIVersion; -use crate::request::EncodeVersion; +use crate::request::EncodeKeyspace; use crate::request::KeyMode; +use crate::request::Keyspace; use crate::request::Plan; use crate::timestamp::TimestampExt; use crate::transaction::lock::ResolveLocksOptions; @@ -48,14 +48,14 @@ const SCAN_LOCK_BATCH_SIZE: u32 = 1024; /// awaited to execute. pub struct Client { pd: Arc, - api_version: APIVersion, + keyspace: Keyspace, } impl Clone for Client { fn clone(&self) -> Self { Self { pd: self.pd.clone(), - api_version: self.api_version, + keyspace: self.keyspace, } } } @@ -109,14 +109,14 @@ impl Client { debug!("creating new transactional client"); let pd_endpoints: Vec = pd_endpoints.into_iter().map(Into::into).collect(); let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config.clone(), true).await?); - let api_version = match config.keyspace { + let keyspace = match config.keyspace { Some(keyspace) => { let keyspace_id = pd.get_keyspace_id(&keyspace).await?; - APIVersion::V2 { keyspace_id } + Keyspace::Enable { keyspace_id } } - None => APIVersion::V1, + None => Keyspace::Disable, }; - Ok(Client { pd, api_version }) + Ok(Client { pd, keyspace }) } /// Creates a new optimistic [`Transaction`]. @@ -258,10 +258,10 @@ impl Client { // scan all locks with ts <= safepoint let ctx = ResolveLocksContext::default(); let backoff = Backoff::equal_jitter_backoff(100, 10000, 50); - let range = range.into().encode_version(self.api_version, KeyMode::Txn); + let range = range.into().encode_keyspace(self.keyspace, KeyMode::Txn); let req = new_scan_lock_request(range, safepoint, options.batch_size); - let plan = crate::request::PlanBuilder::new(self.pd.clone(), self.api_version, req) - .cleanup_locks(ctx.clone(), options, backoff, self.api_version) + let plan = crate::request::PlanBuilder::new(self.pd.clone(), self.keyspace, req) + .cleanup_locks(ctx.clone(), options, backoff, self.keyspace) .retry_multi_region(DEFAULT_REGION_BACKOFF) .extract_error() .merge(crate::request::Collect) @@ -278,9 +278,9 @@ impl Client { range: impl Into, batch_size: u32, ) -> Result> { - let range = range.into().encode_version(self.api_version, KeyMode::Txn); + let range = range.into().encode_keyspace(self.keyspace, KeyMode::Txn); let req = new_scan_lock_request(range, safepoint, batch_size); - let plan = crate::request::PlanBuilder::new(self.pd.clone(), self.api_version, req) + let plan = crate::request::PlanBuilder::new(self.pd.clone(), self.keyspace, req) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(crate::request::Collect) .plan(); @@ -295,9 +295,9 @@ impl Client { /// /// This interface is intended for special scenarios that resemble operations like "drop table" or "drop database" in TiDB. pub async fn unsafe_destroy_range(&self, range: impl Into) -> Result<()> { - let range = range.into().encode_version(self.api_version, KeyMode::Txn); + let range = range.into().encode_keyspace(self.keyspace, KeyMode::Txn); let req = new_unsafe_destroy_range_request(range); - let plan = crate::request::PlanBuilder::new(self.pd.clone(), self.api_version, req) + let plan = crate::request::PlanBuilder::new(self.pd.clone(), self.keyspace, req) .all_stores(DEFAULT_STORE_BACKOFF) .merge(crate::request::Collect) .plan(); @@ -305,6 +305,6 @@ impl Client { } fn new_transaction(&self, timestamp: Timestamp, options: TransactionOptions) -> Transaction { - Transaction::new(timestamp, self.pd.clone(), options, self.api_version) + Transaction::new(timestamp, self.pd.clone(), options, self.keyspace) } } diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index 68e43735..a2182c7a 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -18,9 +18,9 @@ use crate::proto::kvrpcpb; use crate::proto::kvrpcpb::TxnInfo; use crate::proto::pdpb::Timestamp; use crate::region::RegionVerId; -use crate::request::APIVersion; use crate::request::Collect; use crate::request::CollectSingle; +use crate::request::Keyspace; use crate::request::Plan; use crate::store::RegionStore; use crate::timestamp::TimestampExt; @@ -45,7 +45,7 @@ const RESOLVE_LOCK_RETRY_LIMIT: usize = 10; pub async fn resolve_locks( locks: Vec, pd_client: Arc, - api_version: APIVersion, + keyspace: Keyspace, ) -> Result /* live_locks */> { debug!("resolving locks"); let ts = pd_client.clone().get_timestamp().await?; @@ -78,13 +78,12 @@ pub async fn resolve_locks( Some(&commit_version) => commit_version, None => { let request = requests::new_cleanup_request(lock.primary_lock, lock.lock_version); - let plan = - crate::request::PlanBuilder::new(pd_client.clone(), api_version, request) - .resolve_lock(OPTIMISTIC_BACKOFF, api_version) - .retry_multi_region(DEFAULT_REGION_BACKOFF) - .merge(CollectSingle) - .post_process_default() - .plan(); + let plan = crate::request::PlanBuilder::new(pd_client.clone(), keyspace, request) + .resolve_lock(OPTIMISTIC_BACKOFF, keyspace) + .retry_multi_region(DEFAULT_REGION_BACKOFF) + .merge(CollectSingle) + .post_process_default() + .plan(); let commit_version = plan.execute().await?; commit_versions.insert(lock.lock_version, commit_version); commit_version @@ -96,7 +95,7 @@ pub async fn resolve_locks( lock.lock_version, commit_version, pd_client.clone(), - api_version, + keyspace, ) .await?; clean_regions @@ -112,7 +111,7 @@ async fn resolve_lock_with_retry( start_version: u64, commit_version: u64, pd_client: Arc, - api_version: APIVersion, + keyspace: Keyspace, ) -> Result { debug!("resolving locks with retry"); // FIXME: Add backoff @@ -123,10 +122,10 @@ async fn resolve_lock_with_retry( let ver_id = store.region_with_leader.ver_id(); let request = requests::new_resolve_lock_request(start_version, commit_version); // The only place where single-region is used - let plan = crate::request::PlanBuilder::new(pd_client.clone(), api_version, request) + let plan = crate::request::PlanBuilder::new(pd_client.clone(), keyspace, request) .single_region_with_store(store) .await? - .resolve_lock(Backoff::no_backoff(), api_version) + .resolve_lock(Backoff::no_backoff(), keyspace) .extract_error() .plan(); match plan.execute().await { @@ -218,7 +217,7 @@ impl LockResolver { store: RegionStore, locks: Vec, pd_client: Arc, // TODO: make pd_client a member of LockResolver - api_version: APIVersion, + keyspace: Keyspace, ) -> Result<()> { if locks.is_empty() { return Ok(()); @@ -240,7 +239,7 @@ impl LockResolver { let mut status = self .check_txn_status( pd_client.clone(), - api_version, + keyspace, txn_id, l.primary_lock.clone(), 0, @@ -257,7 +256,7 @@ impl LockResolver { let secondary_status = self .check_all_secondaries( pd_client.clone(), - api_version, + keyspace, lock_info.secondaries.clone(), txn_id, ) @@ -278,7 +277,7 @@ impl LockResolver { status = self .check_txn_status( pd_client.clone(), - api_version, + keyspace, txn_id, l.primary_lock, 0, @@ -327,7 +326,7 @@ impl LockResolver { txn_info_vec.push(txn_info); } let cleaned_region = self - .batch_resolve_locks(pd_client.clone(), api_version, store.clone(), txn_info_vec) + .batch_resolve_locks(pd_client.clone(), keyspace, store.clone(), txn_info_vec) .await?; for txn_id in txn_ids { self.ctx @@ -342,7 +341,7 @@ impl LockResolver { pub async fn check_txn_status( &mut self, pd_client: Arc, - api_version: APIVersion, + keyspace: Keyspace, txn_id: u64, primary: Vec, caller_start_ts: u64, @@ -372,7 +371,7 @@ impl LockResolver { force_sync_commit, resolving_pessimistic_lock, ); - let plan = crate::request::PlanBuilder::new(pd_client.clone(), api_version, req) + let plan = crate::request::PlanBuilder::new(pd_client.clone(), keyspace, req) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(CollectSingle) .extract_error() @@ -392,12 +391,12 @@ impl LockResolver { async fn check_all_secondaries( &mut self, pd_client: Arc, - api_version: APIVersion, + keyspace: Keyspace, keys: Vec>, txn_id: u64, ) -> Result { let req = new_check_secondary_locks_request(keys, txn_id); - let plan = crate::request::PlanBuilder::new(pd_client.clone(), api_version, req) + let plan = crate::request::PlanBuilder::new(pd_client.clone(), keyspace, req) .retry_multi_region(DEFAULT_REGION_BACKOFF) .extract_error() .merge(Collect) @@ -408,13 +407,13 @@ impl LockResolver { async fn batch_resolve_locks( &mut self, pd_client: Arc, - api_version: APIVersion, + keyspace: Keyspace, store: RegionStore, txn_infos: Vec, ) -> Result { let ver_id = store.region_with_leader.ver_id(); let request = requests::new_batch_resolve_lock_request(txn_infos.clone()); - let plan = crate::request::PlanBuilder::new(pd_client.clone(), api_version, request) + let plan = crate::request::PlanBuilder::new(pd_client.clone(), keyspace, request) .single_region_with_store(store.clone()) .await? .extract_error() @@ -442,11 +441,11 @@ mod tests { use crate::proto::errorpb; #[rstest::rstest] - #[case(APIVersion::V1)] - #[case(APIVersion::V2 { keyspace_id: 0 })] + #[case(Keyspace::Disable)] + #[case(Keyspace::Enable { keyspace_id: 0 })] #[tokio::test] #[serial] - async fn test_resolve_lock_with_retry(#[case] api_version: APIVersion) { + async fn test_resolve_lock_with_retry(#[case] keyspace: Keyspace) { // Test resolve lock within retry limit fail::cfg("region-error", "9*return").unwrap(); @@ -465,7 +464,7 @@ mod tests { let key = vec![1]; let region1 = MockPdClient::region1(); - let resolved_region = resolve_lock_with_retry(&key, 1, 2, client.clone(), api_version) + let resolved_region = resolve_lock_with_retry(&key, 1, 2, client.clone(), keyspace) .await .unwrap(); assert_eq!(region1.ver_id(), resolved_region); @@ -473,7 +472,7 @@ mod tests { // Test resolve lock over retry limit fail::cfg("region-error", "10*return").unwrap(); let key = vec![100]; - resolve_lock_with_retry(&key, 3, 4, client, api_version) + resolve_lock_with_retry(&key, 3, 4, client, keyspace) .await .expect_err("should return error"); } diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 1e2dae24..cae46179 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -19,17 +19,17 @@ use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::kvrpcpb; use crate::proto::pdpb::Timestamp; -use crate::request::APIVersion; use crate::request::Collect; use crate::request::CollectError; use crate::request::CollectSingle; use crate::request::CollectWithShard; -use crate::request::EncodeVersion; +use crate::request::EncodeKeyspace; use crate::request::KeyMode; +use crate::request::Keyspace; use crate::request::Plan; use crate::request::PlanBuilder; use crate::request::RetryOptions; -use crate::request::TruncateVersion; +use crate::request::TruncateKeyspace; use crate::timestamp::TimestampExt; use crate::transaction::buffer::Buffer; use crate::transaction::lowering::*; @@ -84,7 +84,7 @@ pub struct Transaction { buffer: Buffer, rpc: Arc, options: TransactionOptions, - api_version: APIVersion, + keyspace: Keyspace, is_heartbeat_started: bool, start_instant: Instant, } @@ -94,7 +94,7 @@ impl Transaction { timestamp: Timestamp, rpc: Arc, options: TransactionOptions, - api_version: APIVersion, + keyspace: Keyspace, ) -> Transaction { let status = if options.read_only { TransactionStatus::ReadOnly @@ -107,7 +107,7 @@ impl Transaction { buffer: Buffer::new(options.is_pessimistic()), rpc, options, - api_version, + keyspace, is_heartbeat_started: false, start_instant: std::time::Instant::now(), } @@ -136,15 +136,15 @@ impl Transaction { self.check_allow_operation().await?; let timestamp = self.timestamp.clone(); let rpc = self.rpc.clone(); - let key = key.into().encode_version(self.api_version, KeyMode::Txn); + let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn); let retry_options = self.options.retry_options.clone(); - let api_version = self.api_version; + let keyspace = self.keyspace; self.buffer .get_or_else(key, |key| async move { let request = new_get_request(key, timestamp); - let plan = PlanBuilder::new(rpc, api_version, request) - .resolve_lock(retry_options.lock_backoff, api_version) + let plan = PlanBuilder::new(rpc, keyspace, request) + .resolve_lock(retry_options.lock_backoff, keyspace) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(CollectSingle) .post_process_default() @@ -204,7 +204,7 @@ impl Transaction { self.lock_keys(iter::once(key.clone())).await?; self.get(key).await } else { - let key = key.into().encode_version(self.api_version, KeyMode::Txn); + let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn); let mut pairs = self.pessimistic_lock(iter::once(key), true).await?; debug_assert!(pairs.len() <= 1); match pairs.pop() { @@ -269,17 +269,17 @@ impl Transaction { self.check_allow_operation().await?; let timestamp = self.timestamp.clone(); let rpc = self.rpc.clone(); - let api_version = self.api_version; + let keyspace = self.keyspace; let keys = keys .into_iter() - .map(move |k| k.into().encode_version(api_version, KeyMode::Txn)); + .map(move |k| k.into().encode_keyspace(keyspace, KeyMode::Txn)); let retry_options = self.options.retry_options.clone(); self.buffer .batch_get_or_else(keys, move |keys| async move { let request = new_batch_get_request(keys, timestamp); - let plan = PlanBuilder::new(rpc, api_version, request) - .resolve_lock(retry_options.lock_backoff, api_version) + let plan = PlanBuilder::new(rpc, keyspace, request) + .resolve_lock(retry_options.lock_backoff, keyspace) .retry_multi_region(retry_options.region_backoff) .merge(Collect) .plan(); @@ -288,7 +288,7 @@ impl Transaction { .map(|r| r.into_iter().map(Into::into).collect()) }) .await - .map(move |pairs| pairs.map(move |pair| pair.truncate_version(api_version))) + .map(move |pairs| pairs.map(move |pair| pair.truncate_keyspace(keyspace))) } /// Create a new 'batch get for update' request. @@ -329,14 +329,14 @@ impl Transaction { self.lock_keys(keys.clone()).await?; Ok(self.batch_get(keys).await?.collect()) } else { - let api_version = self.api_version; + let keyspace = self.keyspace; let keys = keys .into_iter() - .map(move |k| k.into().encode_version(api_version, KeyMode::Txn)); + .map(move |k| k.into().encode_keyspace(keyspace, KeyMode::Txn)); let pairs = self .pessimistic_lock(keys, true) .await? - .truncate_version(api_version); + .truncate_keyspace(keyspace); Ok(pairs) } } @@ -463,7 +463,7 @@ impl Transaction { pub async fn put(&mut self, key: impl Into, value: impl Into) -> Result<()> { debug!("invoking transactional put request"); self.check_allow_operation().await?; - let key = key.into().encode_version(self.api_version, KeyMode::Txn); + let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn); if self.is_pessimistic() { self.pessimistic_lock(iter::once(key.clone()), false) .await?; @@ -494,7 +494,7 @@ impl Transaction { pub async fn insert(&mut self, key: impl Into, value: impl Into) -> Result<()> { debug!("invoking transactional insert request"); self.check_allow_operation().await?; - let key = key.into().encode_version(self.api_version, KeyMode::Txn); + let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn); if self.buffer.get(&key).is_some() { return Err(Error::DuplicateKeyInsertion); } @@ -529,7 +529,7 @@ impl Transaction { pub async fn delete(&mut self, key: impl Into) -> Result<()> { debug!("invoking transactional delete request"); self.check_allow_operation().await?; - let key = key.into().encode_version(self.api_version, KeyMode::Txn); + let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn); if self.is_pessimistic() { self.pessimistic_lock(iter::once(key.clone()), false) .await?; @@ -566,7 +566,7 @@ impl Transaction { self.check_allow_operation().await?; let mutations: Vec = mutations .into_iter() - .map(|mutation| mutation.encode_version(self.api_version, KeyMode::Txn)) + .map(|mutation| mutation.encode_keyspace(self.keyspace, KeyMode::Txn)) .collect(); if self.is_pessimistic() { self.pessimistic_lock(mutations.iter().map(|m| m.key().clone()), false) @@ -611,10 +611,10 @@ impl Transaction { ) -> Result<()> { debug!("invoking transactional lock_keys request"); self.check_allow_operation().await?; - let api_version = self.api_version; + let keyspace = self.keyspace; let keys = keys .into_iter() - .map(move |k| k.into().encode_version(api_version, KeyMode::Txn)); + .map(move |k| k.into().encode_keyspace(keyspace, KeyMode::Txn)); match self.options.kind { TransactionKind::Optimistic => { for key in keys { @@ -672,7 +672,7 @@ impl Transaction { self.timestamp.clone(), self.rpc.clone(), self.options.clone(), - self.api_version, + self.keyspace, self.buffer.get_write_size() as u64, self.start_instant, ) @@ -725,7 +725,7 @@ impl Transaction { self.timestamp.clone(), self.rpc.clone(), self.options.clone(), - self.api_version, + self.keyspace, self.buffer.get_write_size() as u64, self.start_instant, ) @@ -759,10 +759,10 @@ impl Transaction { primary_key, self.start_instant.elapsed().as_millis() as u64 + MAX_TTL, ); - let plan = PlanBuilder::new(self.rpc.clone(), self.api_version, request) + let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, request) .resolve_lock( self.options.retry_options.lock_backoff.clone(), - self.api_version, + self.keyspace, ) .retry_multi_region(self.options.retry_options.region_backoff.clone()) .extract_error() @@ -783,8 +783,8 @@ impl Transaction { let timestamp = self.timestamp.clone(); let rpc = self.rpc.clone(); let retry_options = self.options.retry_options.clone(); - let api_version = self.api_version; - let range = range.into().encode_version(self.api_version, KeyMode::Txn); + let keyspace = self.keyspace; + let range = range.into().encode_keyspace(self.keyspace, KeyMode::Txn); self.buffer .scan_and_fetch( @@ -795,8 +795,8 @@ impl Transaction { move |new_range, new_limit| async move { let request = new_scan_request(new_range, timestamp, new_limit, key_only, reverse); - let plan = PlanBuilder::new(rpc, api_version, request) - .resolve_lock(retry_options.lock_backoff, api_version) + let plan = PlanBuilder::new(rpc, keyspace, request) + .resolve_lock(retry_options.lock_backoff, keyspace) .retry_multi_region(retry_options.region_backoff) .merge(Collect) .plan(); @@ -806,7 +806,7 @@ impl Transaction { }, ) .await - .map(move |pairs| pairs.map(move |pair| pair.truncate_version(api_version))) + .map(move |pairs| pairs.map(move |pair| pair.truncate_keyspace(keyspace))) } /// Pessimistically lock the keys, and optionally retrieve corresponding values. @@ -851,10 +851,10 @@ impl Transaction { for_update_ts.clone(), need_value, ); - let plan = PlanBuilder::new(self.rpc.clone(), self.api_version, request) + let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, request) .resolve_lock( self.options.retry_options.lock_backoff.clone(), - self.api_version, + self.keyspace, ) .preserve_shard() .retry_multi_region_preserve_results(self.options.retry_options.region_backoff.clone()) @@ -908,10 +908,10 @@ impl Transaction { start_version, for_update_ts, ); - let plan = PlanBuilder::new(self.rpc.clone(), self.api_version, req) + let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, req) .resolve_lock( self.options.retry_options.lock_backoff.clone(), - self.api_version, + self.keyspace, ) .retry_multi_region(self.options.retry_options.region_backoff.clone()) .extract_error() @@ -960,7 +960,7 @@ impl Transaction { HeartbeatOption::FixedTime(heartbeat_interval) => heartbeat_interval, }; let start_instant = self.start_instant; - let api_version = self.api_version; + let keyspace = self.keyspace; let heartbeat_task = async move { loop { @@ -981,7 +981,7 @@ impl Transaction { primary_key.clone(), start_instant.elapsed().as_millis() as u64 + MAX_TTL, ); - let plan = PlanBuilder::new(rpc.clone(), api_version, request) + let plan = PlanBuilder::new(rpc.clone(), keyspace, request) .retry_multi_region(region_backoff.clone()) .merge(CollectSingle) .plan(); @@ -1257,7 +1257,7 @@ struct Committer { start_version: Timestamp, rpc: Arc, options: TransactionOptions, - api_version: APIVersion, + keyspace: Keyspace, #[new(default)] undetermined: bool, write_size: u64, @@ -1335,10 +1335,10 @@ impl Committer { .collect(); // FIXME set max_commit_ts and min_commit_ts - let plan = PlanBuilder::new(self.rpc.clone(), self.api_version, request) + let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, request) .resolve_lock( self.options.retry_options.lock_backoff.clone(), - self.api_version, + self.keyspace, ) .retry_multi_region(self.options.retry_options.region_backoff.clone()) .merge(CollectError) @@ -1378,10 +1378,10 @@ impl Committer { self.start_version.clone(), commit_version.clone(), ); - let plan = PlanBuilder::new(self.rpc.clone(), self.api_version, req) + let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, req) .resolve_lock( self.options.retry_options.lock_backoff.clone(), - self.api_version, + self.keyspace, ) .retry_multi_region(self.options.retry_options.region_backoff.clone()) .extract_error() @@ -1445,8 +1445,8 @@ impl Committer { .filter(|key| &primary_key != key); new_commit_request(keys, self.start_version, commit_version) }; - let plan = PlanBuilder::new(self.rpc, self.api_version, req) - .resolve_lock(self.options.retry_options.lock_backoff, self.api_version) + let plan = PlanBuilder::new(self.rpc, self.keyspace, req) + .resolve_lock(self.options.retry_options.lock_backoff, self.keyspace) .retry_multi_region(self.options.retry_options.region_backoff) .extract_error() .plan(); @@ -1466,8 +1466,8 @@ impl Committer { match self.options.kind { TransactionKind::Optimistic => { let req = new_batch_rollback_request(keys, self.start_version); - let plan = PlanBuilder::new(self.rpc, self.api_version, req) - .resolve_lock(self.options.retry_options.lock_backoff, self.api_version) + let plan = PlanBuilder::new(self.rpc, self.keyspace, req) + .resolve_lock(self.options.retry_options.lock_backoff, self.keyspace) .retry_multi_region(self.options.retry_options.region_backoff) .extract_error() .plan(); @@ -1475,8 +1475,8 @@ impl Committer { } TransactionKind::Pessimistic(for_update_ts) => { let req = new_pessimistic_rollback_request(keys, self.start_version, for_update_ts); - let plan = PlanBuilder::new(self.rpc, self.api_version, req) - .resolve_lock(self.options.retry_options.lock_backoff, self.api_version) + let plan = PlanBuilder::new(self.rpc, self.keyspace, req) + .resolve_lock(self.options.retry_options.lock_backoff, self.keyspace) .retry_multi_region(self.options.retry_options.region_backoff) .extract_error() .plan(); @@ -1546,16 +1546,16 @@ mod tests { use crate::mock::MockPdClient; use crate::proto::kvrpcpb; use crate::proto::pdpb::Timestamp; - use crate::request::APIVersion; + use crate::request::Keyspace; use crate::transaction::HeartbeatOption; use crate::Transaction; use crate::TransactionOptions; #[rstest::rstest] - #[case(APIVersion::V1)] - #[case(APIVersion::V2 { keyspace_id: 0 })] + #[case(Keyspace::Disable)] + #[case(Keyspace::Enable { keyspace_id: 0 })] #[tokio::test] - async fn test_optimistic_heartbeat(#[case] api_version: APIVersion) -> Result<(), io::Error> { + async fn test_optimistic_heartbeat(#[case] keyspace: Keyspace) -> Result<(), io::Error> { let scenario = FailScenario::setup(); fail::cfg("after-prewrite", "sleep(1500)").unwrap(); let heartbeats = Arc::new(AtomicUsize::new(0)); @@ -1578,7 +1578,7 @@ mod tests { pd_client, TransactionOptions::new_optimistic() .heartbeat_option(HeartbeatOption::FixedTime(Duration::from_secs(1))), - api_version, + keyspace, ); heartbeat_txn.put(key1.clone(), "foo").await.unwrap(); let heartbeat_txn_handle = tokio::task::spawn_blocking(move || { @@ -1592,10 +1592,10 @@ mod tests { } #[rstest::rstest] - #[case(APIVersion::V1)] - #[case(APIVersion::V2 { keyspace_id: 0 })] + #[case(Keyspace::Disable)] + #[case(Keyspace::Enable { keyspace_id: 0 })] #[tokio::test] - async fn test_pessimistic_heartbeat(#[case] api_version: APIVersion) -> Result<(), io::Error> { + async fn test_pessimistic_heartbeat(#[case] keyspace: Keyspace) -> Result<(), io::Error> { let heartbeats = Arc::new(AtomicUsize::new(0)); let heartbeats_cloned = heartbeats.clone(); let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook( @@ -1621,7 +1621,7 @@ mod tests { pd_client, TransactionOptions::new_pessimistic() .heartbeat_option(HeartbeatOption::FixedTime(Duration::from_secs(1))), - api_version, + keyspace, ); heartbeat_txn.put(key1.clone(), "foo").await.unwrap(); assert_eq!(heartbeats.load(Ordering::SeqCst), 0);