diff --git a/Cargo.toml b/Cargo.toml index 0aab2b03..b2efb2b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,9 +38,12 @@ prometheus = { version = "0.13", default-features = false } prost = "0.12" rand = "0.8" regex = "1" +reqwest = { version = "0.11", features = ["json", "native-tls-vendored"] } semver = "1.0" serde = "1.0" serde_derive = "1.0" +serde_json = "1" +take_mut = "0.2.2" thiserror = "1" tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] } tonic = { version = "0.10", features = ["tls"] } @@ -51,9 +54,7 @@ env_logger = "0.10" fail = { version = "0.4", features = ["failpoints"] } proptest = "1" proptest-derive = "0.3" -reqwest = { version = "0.11", default-features = false, features = [ - "native-tls-vendored", -] } +rstest = "0.18.2" serde_json = "1" serial_test = "0.5.0" simple_logger = "1" diff --git a/config/tikv.toml b/config/tikv.toml index 52965253..c0f97d64 100644 --- a/config/tikv.toml +++ b/config/tikv.toml @@ -15,3 +15,7 @@ max-open-files = 10000 [raftdb] max-open-files = 10000 + +[storage] +api-version = 2 +enable-ttl = true diff --git a/examples/pessimistic.rs b/examples/pessimistic.rs index 95e5e6f5..824710e0 100644 --- a/examples/pessimistic.rs +++ b/examples/pessimistic.rs @@ -24,7 +24,9 @@ async fn main() { Config::default().with_security(ca, cert, key) } else { Config::default() - }; + } + // This example uses the default keyspace, so api-v2 must be enabled on the server. + .with_default_keyspace(); // init let client = Client::new_with_config(args.pd, config) diff --git a/examples/raw.rs b/examples/raw.rs index e5b24e1f..cb3bdcd8 100644 --- a/examples/raw.rs +++ b/examples/raw.rs @@ -31,7 +31,9 @@ async fn main() -> Result<()> { Config::default().with_security(ca, cert, key) } else { Config::default() - }; + } + // This example uses the default keyspace, so api-v2 must be enabled on the server. + .with_default_keyspace(); // When we first create a client we receive a `Connect` structure which must be resolved before // the client is actually connected and usable. @@ -136,6 +138,8 @@ async fn main() -> Result<()> { ); println!("Scanning batch scan from {batch_scan_keys:?} gives: {vals:?}"); - // Cleanly exit. + // Delete all keys in the whole range. + client.delete_range("".to_owned().."".to_owned()).await?; + Ok(()) } diff --git a/examples/transaction.rs b/examples/transaction.rs index 119fc88c..cd3528e8 100644 --- a/examples/transaction.rs +++ b/examples/transaction.rs @@ -87,7 +87,9 @@ async fn main() { Config::default().with_security(ca, cert, key) } else { Config::default() - }; + } + // This example uses the default keyspace, so api-v2 must be enabled on the server. + .with_default_keyspace(); let txn = Client::new_with_config(args.pd, config) .await diff --git a/src/common/errors.rs b/src/common/errors.rs index 29622c92..59f55776 100644 --- a/src/common/errors.rs +++ b/src/common/errors.rs @@ -50,9 +50,14 @@ pub enum Error { /// Wraps a `grpcio::Error`. #[error("gRPC error: {0}")] Grpc(#[from] tonic::transport::Error), + /// Wraps a `reqwest::Error`. + #[error("http error: {0}")] + Http(#[from] reqwest::Error), /// Wraps a `grpcio::Error`. #[error("gRPC api error: {0}")] GrpcAPI(#[from] tonic::Status), + #[error("Http request failed: unknown respond {0}")] + UnknownHttpRespond(String), /// Wraps a `grpcio::Error`. #[error("url error: {0}")] Url(#[from] tonic::codegen::http::uri::InvalidUri), diff --git a/src/config.rs b/src/config.rs index 1be273cc..79dad855 100644 --- a/src/config.rs +++ b/src/config.rs @@ -19,6 +19,7 @@ pub struct Config { pub cert_path: Option, pub key_path: Option, pub timeout: Duration, + pub keyspace: Option, } const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(2); @@ -30,6 +31,7 @@ impl Default for Config { cert_path: None, key_path: None, timeout: DEFAULT_REQUEST_TIMEOUT, + keyspace: None, } } } @@ -83,4 +85,21 @@ impl Config { self.timeout = timeout; self } + + /// Set to use default keyspace. + /// + /// Server should enable `storage.api-version = 2` to use this feature. + #[must_use] + pub fn with_default_keyspace(self) -> Self { + self.with_keyspace("DEFAULT") + } + + /// Set the use keyspace for the client. + /// + /// Server should enable `storage.api-version = 2` to use this feature. + #[must_use] + pub fn with_keyspace(mut self, keyspace: &str) -> Self { + self.keyspace = Some(keyspace.to_owned()); + self + } } diff --git a/src/kv/bound_range.rs b/src/kv/bound_range.rs index 1cf5ac81..b99cba36 100644 --- a/src/kv/bound_range.rs +++ b/src/kv/bound_range.rs @@ -136,17 +136,11 @@ impl BoundRange { pub fn into_keys(self) -> (Key, Option) { let start = match self.from { Bound::Included(v) => v, - Bound::Excluded(mut v) => { - v.push_zero(); - v - } + Bound::Excluded(v) => v.next_key(), Bound::Unbounded => Key::EMPTY, }; let end = match self.to { - Bound::Included(mut v) => { - v.push_zero(); - Some(v) - } + Bound::Included(v) => Some(v.next_key()), Bound::Excluded(v) => Some(v), Bound::Unbounded => None, }; diff --git a/src/kv/key.rs b/src/kv/key.rs index 7ee16597..1b4f0606 100644 --- a/src/kv/key.rs +++ b/src/kv/key.rs @@ -71,7 +71,7 @@ pub struct Key( test, proptest(strategy = "any_with::>((size_range(_PROPTEST_KEY_MAX), ()))") )] - pub(super) Vec, + pub(crate) Vec, ); impl AsRef for kvrpcpb::Mutation { @@ -98,10 +98,11 @@ impl Key { /// Push a zero to the end of the key. /// - /// Extending a zero makes the new key the smallest key that is greater than than the original one, i.e. the succeeder. + /// Extending a zero makes the new key the smallest key that is greater than than the original one. #[inline] - pub(super) fn push_zero(&mut self) { - self.0.push(0) + pub(crate) fn next_key(mut self) -> Self { + self.0.push(0); + self } /// Convert the key to a lower bound. The key is treated as inclusive. diff --git a/src/lib.rs b/src/lib.rs index a2acf57b..60dc2956 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -94,8 +94,6 @@ pub mod backoff; #[doc(hidden)] -pub mod proto; // export `proto` to enable user customized codec -#[doc(hidden)] pub mod raw; pub mod request; #[doc(hidden)] @@ -106,6 +104,7 @@ mod compat; mod config; mod kv; mod pd; +mod proto; mod region; mod region_cache; mod stats; @@ -146,8 +145,6 @@ pub use crate::raw::Client as RawClient; #[doc(inline)] pub use crate::raw::ColumnFamily; #[doc(inline)] -pub use crate::request::codec; -#[doc(inline)] pub use crate::request::RetryOptions; #[doc(inline)] pub use crate::timestamp::Timestamp; diff --git a/src/mock.rs b/src/mock.rs index f9c94aef..35245541 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -18,7 +18,6 @@ use crate::proto::metapb::RegionEpoch; use crate::proto::metapb::{self}; use crate::region::RegionId; use crate::region::RegionWithLeader; -use crate::request::codec::ApiV1TxnCodec; use crate::store::KvConnect; use crate::store::RegionStore; use crate::store::Request; @@ -31,7 +30,7 @@ use crate::Timestamp; /// Create a `PdRpcClient` with it's internals replaced with mocks so that the /// client can be tested without doing any RPC calls. -pub async fn pd_rpc_client() -> PdRpcClient { +pub async fn pd_rpc_client() -> PdRpcClient { let config = Config::default(); PdRpcClient::new( config.clone(), @@ -44,7 +43,6 @@ pub async fn pd_rpc_client() -> PdRpcClient MockPdClient { - MockPdClient { - client, - codec: ApiV1TxnCodec::default(), - } - } } #[async_trait] @@ -113,7 +102,6 @@ impl MockPdClient { pub fn default() -> MockPdClient { MockPdClient { client: MockKvClient::default(), - codec: ApiV1TxnCodec::default(), } } @@ -177,7 +165,6 @@ impl MockPdClient { #[async_trait] impl PdClient for MockPdClient { - type Codec = ApiV1TxnCodec; type KvClient = MockKvClient; async fn map_region_to_store(self: Arc, region: RegionWithLeader) -> Result { @@ -228,7 +215,7 @@ impl PdClient for MockPdClient { async fn invalidate_region_cache(&self, _ver_id: crate::region::RegionVerId) {} - fn get_codec(&self) -> &Self::Codec { - &self.codec + async fn get_keyspace_id(&self, _keyspace: &str) -> Result { + unimplemented!() } } diff --git a/src/pd/client.rs b/src/pd/client.rs index 5461cb57..2caac29b 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -20,7 +20,6 @@ use crate::region::RegionId; use crate::region::RegionVerId; use crate::region::RegionWithLeader; use crate::region_cache::RegionCache; -use crate::request::codec::{ApiV1TxnCodec, Codec}; use crate::store::KvConnect; use crate::store::RegionStore; use crate::store::TikvConnect; @@ -51,7 +50,6 @@ use crate::Timestamp; /// So if we use transactional APIs, keys in PD are encoded and PD does not know about the encoding stuff. #[async_trait] pub trait PdClient: Send + Sync + 'static { - type Codec: Codec; type KvClient: KvClient + Send + Sync + 'static; /// In transactional API, `region` is decoded (keys in raw format). @@ -67,6 +65,8 @@ pub trait PdClient: Send + Sync + 'static { async fn update_safepoint(self: Arc, safepoint: u64) -> Result; + async fn get_keyspace_id(&self, keyspace: &str) -> Result; + /// In transactional API, `key` is in raw format async fn store_for_key(self: Arc, key: &Key) -> Result { let region = self.region_for_key(key).await?; @@ -193,11 +193,8 @@ pub trait PdClient: Send + Sync + 'static { .boxed() } - fn decode_region( - mut region: RegionWithLeader, - enable_mvcc_codec: bool, - ) -> Result { - if enable_mvcc_codec { + fn decode_region(mut region: RegionWithLeader, enable_codec: bool) -> Result { + if enable_codec { codec::decode_bytes_in_place(&mut region.region.start_key, false)?; codec::decode_bytes_in_place(&mut region.region.end_key, false)?; } @@ -207,30 +204,20 @@ pub trait PdClient: Send + Sync + 'static { async fn update_leader(&self, ver_id: RegionVerId, leader: metapb::Peer) -> Result<()>; async fn invalidate_region_cache(&self, ver_id: RegionVerId); - - /// Get the codec carried by `PdClient`. - /// The purpose of carrying the codec is to avoid passing it on so many calling paths. - fn get_codec(&self) -> &Self::Codec; } /// This client converts requests for the logical TiKV cluster into requests /// for a single TiKV store using PD and internal logic. -pub struct PdRpcClient< - Cod: Codec = ApiV1TxnCodec, - KvC: KvConnect + Send + Sync + 'static = TikvConnect, - Cl = Cluster, -> { +pub struct PdRpcClient { pd: Arc>, kv_connect: KvC, kv_client_cache: Arc>>, - enable_mvcc_codec: bool, + enable_codec: bool, region_cache: RegionCache>, - codec: Option, } #[async_trait] -impl PdClient for PdRpcClient { - type Codec = Cod; +impl PdClient for PdRpcClient { type KvClient = KvC::KvClient; async fn map_region_to_store(self: Arc, region: RegionWithLeader) -> Result { @@ -241,20 +228,20 @@ impl PdClient for PdRpcClien } async fn region_for_key(&self, key: &Key) -> Result { - let enable_mvcc_codec = self.enable_mvcc_codec; - let key = if enable_mvcc_codec { + let enable_codec = self.enable_codec; + let key = if enable_codec { key.to_encoded() } else { key.clone() }; let region = self.region_cache.get_region_by_key(&key).await?; - Self::decode_region(region, enable_mvcc_codec) + Self::decode_region(region, enable_codec) } async fn region_for_id(&self, id: RegionId) -> Result { let region = self.region_cache.get_region_by_id(id).await?; - Self::decode_region(region, self.enable_mvcc_codec) + Self::decode_region(region, self.enable_codec) } async fn all_stores(&self) -> Result> { @@ -283,39 +270,34 @@ impl PdClient for PdRpcClien self.region_cache.invalidate_region_cache(ver_id).await } - fn get_codec(&self) -> &Self::Codec { - self.codec - .as_ref() - .unwrap_or_else(|| panic!("codec not set")) + async fn get_keyspace_id(&self, keyspace: &str) -> Result { + self.pd.get_keyspace_id(keyspace).await } } -impl PdRpcClient { +impl PdRpcClient { pub async fn connect( pd_endpoints: &[String], config: Config, - enable_mvcc_codec: bool, // TODO: infer from `codec`. - codec: Option, - ) -> Result> { + enable_codec: bool, + ) -> Result { PdRpcClient::new( config.clone(), |security_mgr| TikvConnect::new(security_mgr, config.timeout), |security_mgr| RetryClient::connect(pd_endpoints, security_mgr, config.timeout), - enable_mvcc_codec, - codec, + enable_codec, ) .await } } -impl PdRpcClient { +impl PdRpcClient { pub async fn new( config: Config, kv_connect: MakeKvC, pd: MakePd, - enable_mvcc_codec: bool, - codec: Option, - ) -> Result> + enable_codec: bool, + ) -> Result> where PdFut: Future>>, MakeKvC: FnOnce(Arc) -> KvC, @@ -337,9 +319,8 @@ impl PdRpcClient PdRpcClient Err(e), } } - - pub fn set_codec(&mut self, codec: Cod) { - self.codec = Some(codec); - } } fn make_key_range(start_key: Vec, end_key: Vec) -> kvrpcpb::KeyRange { diff --git a/src/pd/cluster.rs b/src/pd/cluster.rs index 3df4d255..4ad841c2 100644 --- a/src/pd/cluster.rs +++ b/src/pd/cluster.rs @@ -16,6 +16,7 @@ use tonic::Request; use super::timestamp::TimestampOracle; use crate::internal_err; use crate::proto::pdpb; +use crate::Error; use crate::Result; use crate::SecurityManager; use crate::Timestamp; @@ -24,6 +25,7 @@ use crate::Timestamp; pub struct Cluster { id: u64, client: pdpb::pd_client::PdClient, + endpoint: String, members: pdpb::GetMembersResponse, tso: TimestampOracle, } @@ -91,6 +93,18 @@ impl Cluster { req.safe_point = safepoint; req.send(&mut self.client, timeout).await } + + pub async fn get_keyspace_id(&self, keyspace: &str) -> Result { + let resp = + reqwest::get(format!("{}/pd/api/v2/keyspaces/{keyspace}", self.endpoint)).await?; + let body = resp.json::().await?; + let keyspace_id = body + .get("id") + .ok_or_else(|| Error::UnknownHttpRespond(body.to_string()))? + .as_u64() + .ok_or_else(|| Error::UnknownHttpRespond(body.to_string()))?; + Ok(keyspace_id as u32) + } } /// An object for connecting and reconnecting to a PD cluster. @@ -109,12 +123,13 @@ impl Connection { timeout: Duration, ) -> Result { let members = self.validate_endpoints(endpoints, timeout).await?; - let (client, members) = self.try_connect_leader(&members, timeout).await?; + let (client, endpoint, members) = self.try_connect_leader(&members, timeout).await?; let id = members.header.as_ref().unwrap().cluster_id; let tso = TimestampOracle::new(id, &client)?; let cluster = Cluster { id, client, + endpoint, members, tso, }; @@ -125,11 +140,13 @@ impl Connection { pub async fn reconnect(&self, cluster: &mut Cluster, timeout: Duration) -> Result<()> { warn!("updating pd client"); let start = Instant::now(); - let (client, members) = self.try_connect_leader(&cluster.members, timeout).await?; + let (client, endpoint, members) = + self.try_connect_leader(&cluster.members, timeout).await?; let tso = TimestampOracle::new(cluster.id, &client)?; *cluster = Cluster { id: cluster.id, client, + endpoint, members, tso, }; @@ -239,7 +256,11 @@ impl Connection { &self, previous: &pdpb::GetMembersResponse, timeout: Duration, - ) -> Result<(pdpb::pd_client::PdClient, pdpb::GetMembersResponse)> { + ) -> Result<( + pdpb::pd_client::PdClient, + String, + pdpb::GetMembersResponse, + )> { let previous_leader = previous.leader.as_ref().unwrap(); let members = &previous.members; let cluster_id = previous.header.as_ref().unwrap().cluster_id; @@ -269,9 +290,10 @@ impl Connection { if let Some(resp) = resp { let leader = resp.leader.as_ref().unwrap(); for ep in &leader.client_urls { - let r = self.try_connect(ep.as_str(), cluster_id, timeout).await; - if r.is_ok() { - return r; + if let Ok((client, members)) = + self.try_connect(ep.as_str(), cluster_id, timeout).await + { + return Ok((client, ep.to_string(), members)); } } } diff --git a/src/pd/retry.rs b/src/pd/retry.rs index 3c17a49e..8eaccc11 100644 --- a/src/pd/retry.rs +++ b/src/pd/retry.rs @@ -45,6 +45,8 @@ pub trait RetryClientTrait { async fn get_timestamp(self: Arc) -> Result; async fn update_safepoint(self: Arc, safepoint: u64) -> Result; + + async fn get_keyspace_id(&self, keyspace: &str) -> Result; } /// Client for communication with a PD cluster. Has the facility to reconnect to the cluster. pub struct RetryClient { @@ -197,6 +199,12 @@ impl RetryClientTrait for RetryClient { .map(|resp| resp.new_safe_point == safepoint) }) } + + async fn get_keyspace_id(&self, keyspace: &str) -> Result { + retry!(self, "get_keyspace_id", |cluster| async { + cluster.get_keyspace_id(keyspace).await + }) + } } impl fmt::Debug for RetryClient { diff --git a/src/raw/client.rs b/src/raw/client.rs index fc733015..3b91f99f 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -15,10 +15,13 @@ use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::metapb; use crate::raw::lowering::*; -use crate::request::codec::{ApiV1RawCodec, Codec, EncodedRequest}; use crate::request::Collect; use crate::request::CollectSingle; +use crate::request::EncodeKeyspace; +use crate::request::KeyMode; +use crate::request::Keyspace; use crate::request::Plan; +use crate::request::TruncateKeyspace; use crate::Backoff; use crate::BoundRange; use crate::ColumnFamily; @@ -36,16 +39,13 @@ const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240; /// /// The returned results of raw request methods are [`Future`](std::future::Future)s that must be /// awaited to execute. -pub struct Client> -where - Cod: Codec, - PdC: PdClient, -{ +pub struct Client { rpc: Arc, cf: Option, backoff: Backoff, /// Whether to use the [`atomic mode`](Client::with_atomic_for_cas). atomic: bool, + keyspace: Keyspace, } impl Clone for Client { @@ -55,11 +55,12 @@ impl Clone for Client { cf: self.cf.clone(), backoff: self.backoff.clone(), atomic: self.atomic, + keyspace: self.keyspace, } } } -impl Client> { +impl Client { /// Create a raw [`Client`] and connect to the TiKV cluster. /// /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for @@ -104,16 +105,23 @@ impl Client> { pd_endpoints: Vec, config: Config, ) -> Result { + let enable_codec = config.keyspace.is_some(); let pd_endpoints: Vec = pd_endpoints.into_iter().map(Into::into).collect(); - let rpc = Arc::new( - PdRpcClient::connect(&pd_endpoints, config, false, Some(ApiV1RawCodec::default())) - .await?, - ); + let rpc = + Arc::new(PdRpcClient::connect(&pd_endpoints, config.clone(), enable_codec).await?); + let keyspace = match config.keyspace { + Some(keyspace) => { + let keyspace_id = rpc.get_keyspace_id(&keyspace).await?; + Keyspace::Enable { keyspace_id } + } + None => Keyspace::Disable, + }; Ok(Client { rpc, cf: None, backoff: DEFAULT_REGION_BACKOFF, atomic: false, + keyspace, }) } @@ -148,11 +156,10 @@ impl Client> { cf: Some(cf), backoff: self.backoff.clone(), atomic: self.atomic, + keyspace: self.keyspace, } } -} -impl Client> { /// Set the [`Backoff`] strategy for retrying requests. /// The default strategy is [`DEFAULT_REGION_BACKOFF`](crate::backoff::DEFAULT_REGION_BACKOFF). /// See [`Backoff`] for more information. @@ -178,6 +185,7 @@ impl Client> { cf: self.cf.clone(), backoff, atomic: self.atomic, + keyspace: self.keyspace, } } @@ -195,11 +203,12 @@ impl Client> { cf: self.cf.clone(), backoff: self.backoff.clone(), atomic: true, + keyspace: self.keyspace, } } } -impl> Client { +impl Client { /// Create a new 'get' request. /// /// Once resolved this request will result in the fetching of the value associated with the @@ -220,9 +229,9 @@ impl> Client { /// ``` pub async fn get(&self, key: impl Into) -> Result> { debug!("invoking raw get request"); - let request = new_raw_get_request(key.into(), self.cf.clone()); - let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) + let key = key.into().encode_keyspace(self.keyspace, KeyMode::Raw); + let request = new_raw_get_request(key, self.cf.clone()); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request) .retry_multi_region(self.backoff.clone()) .merge(CollectSingle) .post_process_default() @@ -253,15 +262,19 @@ impl> Client { keys: impl IntoIterator>, ) -> Result> { debug!("invoking raw batch_get request"); - let request = new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone()); - let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) + let keys = keys + .into_iter() + .map(|k| k.into().encode_keyspace(self.keyspace, KeyMode::Raw)); + let request = new_raw_batch_get_request(keys, self.cf.clone()); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request) .retry_multi_region(self.backoff.clone()) .merge(Collect) .plan(); - plan.execute() - .await - .map(|r| r.into_iter().map(Into::into).collect()) + plan.execute().await.map(|r| { + r.into_iter() + .map(|pair| pair.truncate_keyspace(self.keyspace)) + .collect() + }) } /// Create a new 'put' request. @@ -282,9 +295,9 @@ impl> Client { /// ``` pub async fn put(&self, key: impl Into, value: impl Into) -> Result<()> { debug!("invoking raw put request"); - let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic); - let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) + let key = key.into().encode_keyspace(self.keyspace, KeyMode::Raw); + let request = new_raw_put_request(key, value.into(), self.cf.clone(), self.atomic); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request) .retry_multi_region(self.backoff.clone()) .merge(CollectSingle) .extract_error() @@ -315,13 +328,11 @@ impl> Client { pairs: impl IntoIterator>, ) -> Result<()> { debug!("invoking raw batch_put request"); - let request = new_raw_batch_put_request( - pairs.into_iter().map(Into::into), - self.cf.clone(), - self.atomic, - ); - let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) + let pairs = pairs + .into_iter() + .map(|pair| pair.into().encode_keyspace(self.keyspace, KeyMode::Raw)); + let request = new_raw_batch_put_request(pairs, self.cf.clone(), self.atomic); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request) .retry_multi_region(self.backoff.clone()) .extract_error() .plan(); @@ -348,9 +359,9 @@ impl> Client { /// ``` pub async fn delete(&self, key: impl Into) -> Result<()> { debug!("invoking raw delete request"); - let request = new_raw_delete_request(key.into(), self.cf.clone(), self.atomic); - let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) + let key = key.into().encode_keyspace(self.keyspace, KeyMode::Raw); + let request = new_raw_delete_request(key, self.cf.clone(), self.atomic); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request) .retry_multi_region(self.backoff.clone()) .merge(CollectSingle) .extract_error() @@ -379,10 +390,11 @@ impl> Client { pub async fn batch_delete(&self, keys: impl IntoIterator>) -> Result<()> { debug!("invoking raw batch_delete request"); self.assert_non_atomic()?; - let request = - new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone()); - let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) + let keys = keys + .into_iter() + .map(|k| k.into().encode_keyspace(self.keyspace, KeyMode::Raw)); + let request = new_raw_batch_delete_request(keys, self.cf.clone()); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request) .retry_multi_region(self.backoff.clone()) .extract_error() .plan(); @@ -408,9 +420,9 @@ impl> Client { pub async fn delete_range(&self, range: impl Into) -> Result<()> { debug!("invoking raw delete_range request"); self.assert_non_atomic()?; - let request = new_raw_delete_range_request(range.into(), self.cf.clone()); - let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) + let range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw); + let request = new_raw_delete_range_request(range, self.cf.clone()); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request) .retry_multi_region(self.backoff.clone()) .extract_error() .plan(); @@ -560,14 +572,14 @@ impl> Client { ) -> Result<(Option, bool)> { debug!("invoking raw compare_and_swap request"); self.assert_atomic()?; + let key = key.into().encode_keyspace(self.keyspace, KeyMode::Raw); let req = new_cas_request( - key.into(), + key, new_value.into(), previous_value.into(), self.cf.clone(), ); - let encoded_req = EncodedRequest::new(req, self.rpc.get_codec()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, req) .retry_multi_region(self.backoff.clone()) .merge(CollectSingle) .post_process_default() @@ -581,22 +593,39 @@ impl> Client { copr_version_req: impl Into, ranges: impl IntoIterator>, request_builder: impl Fn(metapb::Region, Vec>) -> Vec + Send + Sync + 'static, - ) -> Result, Vec>)>> { + ) -> Result>, Vec)>> { let copr_version_req = copr_version_req.into(); semver::VersionReq::from_str(&copr_version_req)?; + let ranges = ranges + .into_iter() + .map(|range| range.into().encode_keyspace(self.keyspace, KeyMode::Raw)); + let keyspace = self.keyspace; + let request_builder = move |region, ranges: Vec>| { + request_builder( + region, + ranges + .into_iter() + .map(|range| range.truncate_keyspace(keyspace)) + .collect(), + ) + }; let req = new_raw_coprocessor_request( copr_name.into(), copr_version_req, - ranges.into_iter().map(Into::into), + ranges, request_builder, ); - let encoded_req = EncodedRequest::new(req, self.rpc.get_codec()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, req) .preserve_shard() .retry_multi_region(self.backoff.clone()) .post_process_default() .plan(); - plan.execute().await + Ok(plan + .execute() + .await? + .into_iter() + .map(|(ranges, data)| (ranges.truncate_keyspace(keyspace), data)) + .collect()) } async fn scan_inner( @@ -611,8 +640,9 @@ impl> Client { max_limit: MAX_RAW_KV_SCAN_LIMIT, }); } + + let mut cur_range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw); let mut result = Vec::new(); - let mut cur_range = range.into(); let mut scan_regions = self.rpc.clone().stores_for_range(cur_range.clone()).boxed(); let mut region_store = scan_regions @@ -622,11 +652,11 @@ impl> Client { range: (cur_range.clone()), })??; let mut cur_limit = limit; + while cur_limit > 0 { let request = new_raw_scan_request(cur_range.clone(), cur_limit, key_only, self.cf.clone()); - let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); - let resp = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) + let resp = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request) .single_region_with_store(region_store.clone()) .await? .plan() @@ -639,6 +669,7 @@ impl> Client { .collect::>(); let res_len = region_scan_res.len(); result.append(&mut region_scan_res); + // if the number of results is less than cur_limit, it means this scan range contains more than one region, so we need to scan next region if res_len < cur_limit as usize { region_store = match scan_regions.next().await { @@ -650,15 +681,20 @@ impl> Client { rs } Some(Err(e)) => return Err(e), - None => return Ok(result), + None => break, }; cur_limit -= res_len as u32; } else { break; } } + // limit is a soft limit, so we need check the number of results result.truncate(limit as usize); + + // truncate the prefix of keys + let result = result.truncate_keyspace(self.keyspace); + Ok(result) } @@ -675,18 +711,20 @@ impl> Client { }); } - let request = new_raw_batch_scan_request( - ranges.into_iter().map(Into::into), - each_limit, - key_only, - self.cf.clone(), - ); - let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) + let ranges = ranges + .into_iter() + .map(|range| range.into().encode_keyspace(self.keyspace, KeyMode::Raw)); + + let request = new_raw_batch_scan_request(ranges, each_limit, key_only, self.cf.clone()); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request) .retry_multi_region(self.backoff.clone()) .merge(Collect) .plan(); - plan.execute().await + plan.execute().await.map(|r| { + r.into_iter() + .map(|pair| pair.truncate_keyspace(self.keyspace)) + .collect() + }) } fn assert_non_atomic(&self) -> Result<()> { @@ -739,6 +777,7 @@ mod tests { cf: Some(ColumnFamily::Default), backoff: DEFAULT_REGION_BACKOFF, atomic: false, + keyspace: Keyspace::Enable { keyspace_id: 0 }, }; let resps = client .coprocessor( @@ -750,27 +789,17 @@ mod tests { .await?; let resps: Vec<_> = resps .into_iter() - .map(|(data, ranges)| (String::from_utf8(data).unwrap(), ranges)) + .map(|(ranges, data)| (ranges, String::from_utf8(data).unwrap())) .collect(); assert_eq!( resps, - vec![ - ( - "1:[Key(05)..Key(0A)]".to_string(), - vec![Key::from(vec![5])..Key::from(vec![10])] - ), - ( - "2:[Key(0A)..Key(0F), Key(14)..Key(FAFA)]".to_string(), - vec![ - Key::from(vec![10])..Key::from(vec![15]), - Key::from(vec![20])..Key::from(vec![250, 250]) - ] - ), - ( - "3:[Key(FAFA)..Key()]".to_string(), - vec![Key::from(vec![250, 250])..Key::from(vec![])] - ) - ] + vec![( + vec![ + Key::from(vec![5])..Key::from(vec![15]), + Key::from(vec![20])..Key::from(vec![]) + ], + "2:[Key(05)..Key(0F), Key(14)..Key()]".to_string(), + ),] ); Ok(()) } diff --git a/src/raw/requests.rs b/src/raw/requests.rs index 0be733cf..2d2677ee 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -13,10 +13,10 @@ use super::RawRpcRequest; use crate::collect_first; use crate::pd::PdClient; use crate::proto::kvrpcpb; -use crate::proto::kvrpcpb::ApiVersion; use crate::proto::metapb; use crate::proto::tikvpb::tikv_client::TikvClient; use crate::range_request; +use crate::region::RegionWithLeader; use crate::request::plan::ResponseWithShard; use crate::request::Collect; use crate::request::CollectSingle; @@ -164,7 +164,7 @@ impl Shardable for kvrpcpb::RawBatchPutRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_context(store.region_with_leader.context()?); + self.set_leader(&store.region_with_leader)?; self.pairs = shard; Ok(()) } @@ -297,7 +297,7 @@ impl Shardable for kvrpcpb::RawBatchScanRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_context(store.region_with_leader.context()?); + self.set_leader(&store.region_with_leader)?; self.ranges = shard; Ok(()) } @@ -399,11 +399,11 @@ impl Request for RawCoprocessorRequest { self.inner.as_any() } - fn set_context(&mut self, context: kvrpcpb::Context) { - self.inner.set_context(context); + fn set_leader(&mut self, leader: &RegionWithLeader) -> Result<()> { + self.inner.set_leader(leader) } - fn set_api_version(&mut self, api_version: ApiVersion) { + fn set_api_version(&mut self, api_version: kvrpcpb::ApiVersion) { self.inner.set_api_version(api_version); } } @@ -423,7 +423,7 @@ impl Shardable for RawCoprocessorRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.inner.context = Some(store.region_with_leader.context()?); + self.set_leader(&store.region_with_leader)?; self.inner.ranges = shard.clone(); self.inner.data = (self.data_builder)(store.region_with_leader.region.clone(), shard); Ok(()) @@ -435,7 +435,7 @@ impl Process>>>> for DefaultProcessor { - type Out = Vec<(Vec, Vec>)>; + type Out = Vec<(Vec>, Vec)>; fn process( &self, @@ -448,11 +448,11 @@ impl .map(|shard_resp| { shard_resp.map(|ResponseWithShard(resp, ranges)| { ( - resp.data, ranges .into_iter() .map(|range| range.start_key.into()..range.end_key.into()) .collect(), + resp.data, ) }) }) @@ -497,21 +497,21 @@ impl HasLocks for kvrpcpb::RawCoprocessorResponse {} mod test { use std::any::Any; - use futures::executor; - use super::*; use crate::backoff::DEFAULT_REGION_BACKOFF; use crate::backoff::OPTIMISTIC_BACKOFF; use crate::mock::MockKvClient; use crate::mock::MockPdClient; use crate::proto::kvrpcpb; - use crate::request::codec::EncodedRequest; + use crate::request::Keyspace; use crate::request::Plan; use crate::Key; - #[test] - #[ignore] - fn test_raw_scan() { + #[rstest::rstest] + #[case(Keyspace::Disable)] + #[case(Keyspace::Enable { keyspace_id: 0 })] + #[tokio::test] + async fn test_raw_scan(#[case] keyspace: Keyspace) { let client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook( |req: &dyn Any| { let req: &kvrpcpb::RawScanRequest = req.downcast_ref().unwrap(); @@ -540,15 +540,14 @@ mod test { key_only: true, ..Default::default() }; - let encoded_scan = EncodedRequest::new(scan, client.get_codec()); - let plan = crate::request::PlanBuilder::new(client, encoded_scan) - .resolve_lock(OPTIMISTIC_BACKOFF) + let plan = crate::request::PlanBuilder::new(client, keyspace, scan) + .resolve_lock(OPTIMISTIC_BACKOFF, keyspace) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(Collect) .plan(); - let scan = executor::block_on(async { plan.execute().await }).unwrap(); + let scan = plan.execute().await.unwrap(); - assert_eq!(scan.len(), 10); + assert_eq!(scan.len(), 49); // FIXME test the keys returned. } } diff --git a/src/region.rs b/src/region.rs index 8e58522c..6fb20321 100644 --- a/src/region.rs +++ b/src/region.rs @@ -2,7 +2,6 @@ use derive_new::new; -use crate::proto::kvrpcpb; use crate::proto::metapb; use crate::Error; use crate::Key; @@ -43,21 +42,6 @@ impl RegionWithLeader { key >= start_key.as_slice() && (key < end_key.as_slice() || end_key.is_empty()) } - pub fn context(&self) -> Result { - self.leader - .as_ref() - .ok_or(Error::LeaderNotFound { - region_id: self.region.id, - }) - .map(|l| { - let mut ctx = kvrpcpb::Context::default(); - ctx.region_id = self.region.id; - ctx.region_epoch = self.region.region_epoch.clone(); - ctx.peer = Some(l.clone()); - ctx - }) - } - pub fn start_key(&self) -> Key { self.region.start_key.to_vec().into() } diff --git a/src/region_cache.rs b/src/region_cache.rs index a557a96f..bbf8921f 100644 --- a/src/region_cache.rs +++ b/src/region_cache.rs @@ -346,6 +346,10 @@ mod test { async fn update_safepoint(self: Arc, _safepoint: u64) -> Result { todo!() } + + async fn get_keyspace_id(&self, _keyspace: &str) -> Result { + todo!() + } } #[tokio::test] diff --git a/src/request/codec.rs b/src/request/codec.rs deleted file mode 100644 index a409a8e7..00000000 --- a/src/request/codec.rs +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. - -use crate::proto::kvrpcpb; -use crate::request::KvRequest; - -pub trait Codec: Clone + Sync + Send + 'static { - fn encode_request(&self, _req: &mut R) {} - // TODO: fn decode_response() -} - -#[derive(Clone, Default)] -pub struct ApiV1TxnCodec {} - -impl Codec for ApiV1TxnCodec {} - -#[derive(Clone, Default)] -pub struct ApiV1RawCodec {} - -impl Codec for ApiV1RawCodec {} - -#[derive(Clone)] -pub struct ApiV2TxnCodec { - _keyspace_id: u32, -} - -impl ApiV2TxnCodec { - pub fn new(keyspace_id: u32) -> Self { - Self { - _keyspace_id: keyspace_id, - } - } -} - -impl Codec for ApiV2TxnCodec { - fn encode_request(&self, req: &mut R) { - req.set_api_version(kvrpcpb::ApiVersion::V2); - // TODO: req.encode_request(self); - } -} - -// TODO: pub struct ApiV2RawCodec - -// EncodeRequest is just a type wrapper to avoid passing not encoded request to `PlanBuilder` by mistake. -#[derive(Clone)] -pub struct EncodedRequest { - pub inner: Req, -} - -impl EncodedRequest { - pub fn new(mut req: Req, codec: &C) -> Self { - codec.encode_request(&mut req); - Self { inner: req } - } -} diff --git a/src/request/keyspace.rs b/src/request/keyspace.rs new file mode 100644 index 00000000..118e6fbf --- /dev/null +++ b/src/request/keyspace.rs @@ -0,0 +1,280 @@ +// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. + +use std::ops::{Bound, Range}; + +use serde_derive::{Deserialize, Serialize}; + +use crate::transaction::Mutation; +use crate::{proto::kvrpcpb, Key}; +use crate::{BoundRange, KvPair}; + +pub const RAW_KEY_PREFIX: u8 = b'r'; +pub const TXN_KEY_PREFIX: u8 = b'x'; +pub const KEYSPACE_PREFIX_LEN: usize = 4; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum Keyspace { + Disable, + Enable { keyspace_id: u32 }, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum KeyMode { + Raw, + Txn, +} + +impl Keyspace { + pub fn api_version(&self) -> kvrpcpb::ApiVersion { + match self { + Keyspace::Disable => kvrpcpb::ApiVersion::V1, + Keyspace::Enable { .. } => kvrpcpb::ApiVersion::V2, + } + } +} + +pub trait EncodeKeyspace { + fn encode_keyspace(self, keyspace: Keyspace, key_mode: KeyMode) -> Self; +} + +pub trait TruncateKeyspace { + fn truncate_keyspace(self, keyspace: Keyspace) -> Self; +} + +impl EncodeKeyspace for Key { + fn encode_keyspace(mut self, keyspace: Keyspace, key_mode: KeyMode) -> Self { + let prefix = match keyspace { + Keyspace::Disable => { + return self; + } + Keyspace::Enable { keyspace_id } => keyspace_prefix(keyspace_id, key_mode), + }; + + prepend_bytes(&mut self.0, &prefix); + + self + } +} + +impl EncodeKeyspace for KvPair { + fn encode_keyspace(mut self, keyspace: Keyspace, key_mode: KeyMode) -> Self { + self.0 = self.0.encode_keyspace(keyspace, key_mode); + self + } +} + +impl EncodeKeyspace for BoundRange { + fn encode_keyspace(mut self, keyspace: Keyspace, key_mode: KeyMode) -> Self { + self.from = match self.from { + Bound::Included(key) => Bound::Included(key.encode_keyspace(keyspace, key_mode)), + Bound::Excluded(key) => Bound::Excluded(key.encode_keyspace(keyspace, key_mode)), + Bound::Unbounded => { + let key = Key::from(vec![]); + Bound::Included(key.encode_keyspace(keyspace, key_mode)) + } + }; + self.to = match self.to { + Bound::Included(key) if !key.is_empty() => { + Bound::Included(key.encode_keyspace(keyspace, key_mode)) + } + Bound::Excluded(key) if !key.is_empty() => { + Bound::Excluded(key.encode_keyspace(keyspace, key_mode)) + } + _ => { + let key = Key::from(vec![]); + let keyspace = match keyspace { + Keyspace::Disable => Keyspace::Disable, + Keyspace::Enable { keyspace_id } => Keyspace::Enable { + keyspace_id: keyspace_id + 1, + }, + }; + Bound::Excluded(key.encode_keyspace(keyspace, key_mode)) + } + }; + self + } +} + +impl EncodeKeyspace for Mutation { + fn encode_keyspace(self, keyspace: Keyspace, key_mode: KeyMode) -> Self { + match self { + Mutation::Put(key, val) => Mutation::Put(key.encode_keyspace(keyspace, key_mode), val), + Mutation::Delete(key) => Mutation::Delete(key.encode_keyspace(keyspace, key_mode)), + } + } +} + +impl TruncateKeyspace for Key { + fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self { + if let Keyspace::Disable = keyspace { + return self; + } + + pretruncate_bytes::(&mut self.0); + + self + } +} + +impl TruncateKeyspace for KvPair { + fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self { + self.0 = self.0.truncate_keyspace(keyspace); + self + } +} + +impl TruncateKeyspace for Range { + fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self { + self.start = self.start.truncate_keyspace(keyspace); + self.end = self.end.truncate_keyspace(keyspace); + self + } +} + +impl TruncateKeyspace for Vec> { + fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self { + for range in &mut self { + take_mut::take(range, |range| range.truncate_keyspace(keyspace)); + } + self + } +} + +impl TruncateKeyspace for Vec { + fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self { + for pair in &mut self { + take_mut::take(pair, |pair| pair.truncate_keyspace(keyspace)); + } + self + } +} + +impl TruncateKeyspace for Vec { + fn truncate_keyspace(mut self, keyspace: Keyspace) -> Self { + for lock in &mut self { + take_mut::take(&mut lock.key, |key| { + Key::from(key).truncate_keyspace(keyspace).into() + }); + take_mut::take(&mut lock.primary_lock, |primary| { + Key::from(primary).truncate_keyspace(keyspace).into() + }); + for secondary in lock.secondaries.iter_mut() { + take_mut::take(secondary, |secondary| { + Key::from(secondary).truncate_keyspace(keyspace).into() + }); + } + } + self + } +} + +fn keyspace_prefix(keyspace_id: u32, key_mode: KeyMode) -> [u8; KEYSPACE_PREFIX_LEN] { + let mut prefix = keyspace_id.to_be_bytes(); + prefix[0] = match key_mode { + KeyMode::Raw => RAW_KEY_PREFIX, + KeyMode::Txn => TXN_KEY_PREFIX, + }; + prefix +} + +fn prepend_bytes(vec: &mut Vec, prefix: &[u8; N]) { + unsafe { + vec.reserve_exact(N); + std::ptr::copy(vec.as_ptr(), vec.as_mut_ptr().add(N), vec.len()); + std::ptr::copy_nonoverlapping(prefix.as_ptr(), vec.as_mut_ptr(), N); + vec.set_len(vec.len() + N); + } +} + +fn pretruncate_bytes(vec: &mut Vec) { + assert!(vec.len() >= N); + unsafe { + std::ptr::copy(vec.as_ptr().add(N), vec.as_mut_ptr(), vec.len() - N); + vec.set_len(vec.len() - N); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_keyspace_prefix() { + let key_mode = KeyMode::Raw; + assert_eq!(keyspace_prefix(0, key_mode), [b'r', 0, 0, 0]); + assert_eq!(keyspace_prefix(1, key_mode), [b'r', 0, 0, 1]); + assert_eq!(keyspace_prefix(0xFFFF, key_mode), [b'r', 0, 0xFF, 0xFF]); + + let key_mode = KeyMode::Txn; + assert_eq!(keyspace_prefix(0, key_mode), [b'x', 0, 0, 0]); + assert_eq!(keyspace_prefix(1, key_mode), [b'x', 0, 0, 1]); + assert_eq!(keyspace_prefix(0xFFFF, key_mode), [b'x', 0, 0xFF, 0xFF]); + } + + #[test] + fn test_encode_version() { + let keyspace = Keyspace::Enable { + keyspace_id: 0xDEAD, + }; + let key_mode = KeyMode::Raw; + + let key = Key::from(vec![0xBE, 0xEF]); + let expected_key = Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF]); + assert_eq!(key.encode_keyspace(keyspace, key_mode), expected_key); + + let bound: BoundRange = (Key::from(vec![0xDE, 0xAD])..Key::from(vec![0xBE, 0xEF])).into(); + let expected_bound: BoundRange = (Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xDE, 0xAD]) + ..Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF])) + .into(); + assert_eq!(bound.encode_keyspace(keyspace, key_mode), expected_bound); + + let bound: BoundRange = (..).into(); + let expected_bound: BoundRange = + (Key::from(vec![b'r', 0, 0xDE, 0xAD])..Key::from(vec![b'r', 0, 0xDE, 0xAE])).into(); + assert_eq!(bound.encode_keyspace(keyspace, key_mode), expected_bound); + + let bound: BoundRange = (Key::from(vec![])..Key::from(vec![])).into(); + let expected_bound: BoundRange = + (Key::from(vec![b'r', 0, 0xDE, 0xAD])..Key::from(vec![b'r', 0, 0xDE, 0xAE])).into(); + assert_eq!(bound.encode_keyspace(keyspace, key_mode), expected_bound); + + let bound: BoundRange = (Key::from(vec![])..=Key::from(vec![])).into(); + let expected_bound: BoundRange = + (Key::from(vec![b'r', 0, 0xDE, 0xAD])..Key::from(vec![b'r', 0, 0xDE, 0xAE])).into(); + assert_eq!(bound.encode_keyspace(keyspace, key_mode), expected_bound); + + let mutation = Mutation::Put(Key::from(vec![0xBE, 0xEF]), vec![4, 5, 6]); + let expected_mutation = Mutation::Put( + Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF]), + vec![4, 5, 6], + ); + assert_eq!( + mutation.encode_keyspace(keyspace, key_mode), + expected_mutation + ); + + let mutation = Mutation::Delete(Key::from(vec![0xBE, 0xEF])); + let expected_mutation = Mutation::Delete(Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF])); + assert_eq!( + mutation.encode_keyspace(keyspace, key_mode), + expected_mutation + ); + } + + #[test] + fn test_truncate_version() { + let key = Key::from(vec![b'r', 0, 0xDE, 0xAD, 0xBE, 0xEF]); + let keyspace = Keyspace::Enable { + keyspace_id: 0xDEAD, + }; + let expected_key = Key::from(vec![0xBE, 0xEF]); + assert_eq!(key.truncate_keyspace(keyspace), expected_key); + + let key = Key::from(vec![b'x', 0, 0xDE, 0xAD, 0xBE, 0xEF]); + let keyspace = Keyspace::Enable { + keyspace_id: 0xDEAD, + }; + let expected_key = Key::from(vec![0xBE, 0xEF]); + assert_eq!(key.truncate_keyspace(keyspace), expected_key); + } +} diff --git a/src/request/mod.rs b/src/request/mod.rs index 8c3a45cb..14de8e90 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -3,6 +3,10 @@ use async_trait::async_trait; use derive_new::new; +pub use self::keyspace::EncodeKeyspace; +pub use self::keyspace::KeyMode; +pub use self::keyspace::Keyspace; +pub use self::keyspace::TruncateKeyspace; pub use self::plan::Collect; pub use self::plan::CollectError; pub use self::plan::CollectSingle; @@ -33,7 +37,7 @@ use crate::store::Request; use crate::store::{HasKeyErrors, Store}; use crate::transaction::HasLocks; -pub mod codec; +mod keyspace; pub mod plan; mod plan_builder; mod shard; @@ -43,9 +47,6 @@ mod shard; pub trait KvRequest: Request + Sized + Clone + Sync + Send + 'static { /// The expected response to the request. type Response: HasKeyErrors + HasLocks + Clone + Send + 'static; - - // TODO: fn encode_request() - // TODO: fn decode_response() } /// For requests or plans which are handled at TiKV store (other than region) level. @@ -98,12 +99,10 @@ mod test { use super::*; use crate::mock::MockKvClient; use crate::mock::MockPdClient; - use crate::pd::PdClient; use crate::proto::kvrpcpb; - use crate::proto::kvrpcpb::ApiVersion; use crate::proto::pdpb::Timestamp; use crate::proto::tikvpb::tikv_client::TikvClient; - use crate::request::codec::EncodedRequest; + use crate::region::RegionWithLeader; use crate::store::store_stream_for_keys; use crate::store::HasRegionError; use crate::transaction::lowering::new_commit_request; @@ -113,7 +112,7 @@ mod test { #[tokio::test] async fn test_region_retry() { - #[derive(Clone)] + #[derive(Debug, Clone)] struct MockRpcResponse; impl HasKeyErrors for MockRpcResponse { @@ -149,11 +148,11 @@ mod test { self } - fn set_context(&mut self, _: kvrpcpb::Context) { - unreachable!(); + fn set_leader(&mut self, _: &RegionWithLeader) -> Result<()> { + Ok(()) } - fn set_api_version(&mut self, _api_version: ApiVersion) {} + fn set_api_version(&mut self, _: kvrpcpb::ApiVersion) {} } #[async_trait] @@ -199,9 +198,8 @@ mod test { |_: &dyn Any| Ok(Box::new(MockRpcResponse) as Box), ))); - let encoded_req = EncodedRequest::new(request, pd_client.get_codec()); - let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) - .resolve_lock(Backoff::no_jitter_backoff(1, 1, 3)) + let plan = crate::request::PlanBuilder::new(pd_client.clone(), Keyspace::Disable, request) + .resolve_lock(Backoff::no_jitter_backoff(1, 1, 3), Keyspace::Disable) .retry_multi_region(Backoff::no_jitter_backoff(1, 1, 3)) .extract_error() .plan(); @@ -224,18 +222,18 @@ mod test { let key: Key = "key".to_owned().into(); let req = new_commit_request(iter::once(key), Timestamp::default(), Timestamp::default()); - let encoded_req = EncodedRequest::new(req, pd_client.get_codec()); // does not extract error - let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req.clone()) - .resolve_lock(OPTIMISTIC_BACKOFF) - .retry_multi_region(OPTIMISTIC_BACKOFF) - .plan(); + let plan = + crate::request::PlanBuilder::new(pd_client.clone(), Keyspace::Disable, req.clone()) + .resolve_lock(OPTIMISTIC_BACKOFF, Keyspace::Disable) + .retry_multi_region(OPTIMISTIC_BACKOFF) + .plan(); assert!(plan.execute().await.is_ok()); // extract error - let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) - .resolve_lock(OPTIMISTIC_BACKOFF) + let plan = crate::request::PlanBuilder::new(pd_client.clone(), Keyspace::Disable, req) + .resolve_lock(OPTIMISTIC_BACKOFF, Keyspace::Disable) .retry_multi_region(OPTIMISTIC_BACKOFF) .extract_error() .plan(); diff --git a/src/request/plan.rs b/src/request/plan.rs index ab72e8aa..06acbe47 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -35,6 +35,8 @@ use crate::util::iter::FlatMapOkIterExt; use crate::Error; use crate::Result; +use super::keyspace::Keyspace; + /// A plan for how to execute a request. A user builds up a plan with various /// options, then exectutes it. #[async_trait] @@ -546,6 +548,7 @@ pub struct ResolveLock { pub inner: P, pub pd_client: Arc, pub backoff: Backoff, + pub keyspace: Keyspace, } impl Clone for ResolveLock { @@ -554,6 +557,7 @@ impl Clone for ResolveLock { inner: self.inner.clone(), pd_client: self.pd_client.clone(), backoff: self.backoff.clone(), + keyspace: self.keyspace, } } } @@ -579,7 +583,7 @@ where } let pd_client = self.pd_client.clone(); - let live_locks = resolve_locks(locks, pd_client.clone()).await?; + let live_locks = resolve_locks(locks, pd_client.clone(), self.keyspace).await?; if live_locks.is_empty() { result = self.inner.execute().await?; } else { @@ -595,7 +599,7 @@ where } } -#[derive(Default)] +#[derive(Debug, Default)] pub struct CleanupLocksResult { pub region_error: Option, pub key_error: Option>, @@ -644,6 +648,7 @@ pub struct CleanupLocks { pub options: ResolveLocksOptions, pub store: Option, pub pd_client: Arc, + pub keyspace: Keyspace, pub backoff: Backoff, } @@ -655,6 +660,7 @@ impl Clone for CleanupLocks { options: self.options, store: None, pd_client: self.pd_client.clone(), + keyspace: self.keyspace, backoff: self.backoff.clone(), } } @@ -715,7 +721,12 @@ where let lock_size = locks.len(); match lock_resolver - .cleanup_locks(self.store.clone().unwrap(), locks, self.pd_client.clone()) + .cleanup_locks( + self.store.clone().unwrap(), + locks, + self.pd_client.clone(), + self.keyspace, + ) .await { Ok(()) => { @@ -891,6 +902,7 @@ mod test { inner: ErrPlan, backoff: Backoff::no_backoff(), pd_client: Arc::new(MockPdClient::default()), + keyspace: Keyspace::Disable, }, pd_client: Arc::new(MockPdClient::default()), backoff: Backoff::no_backoff(), diff --git a/src/request/plan_builder.rs b/src/request/plan_builder.rs index 8e2329e7..c117d14b 100644 --- a/src/request/plan_builder.rs +++ b/src/request/plan_builder.rs @@ -4,9 +4,9 @@ use std::marker::PhantomData; use std::sync::Arc; use super::plan::PreserveShard; +use super::Keyspace; use crate::backoff::Backoff; use crate::pd::PdClient; -use crate::request::codec::EncodedRequest; use crate::request::plan::{CleanupLocks, RetryableAllStores}; use crate::request::shard::HasNextBatch; use crate::request::Dispatch; @@ -47,11 +47,12 @@ pub struct Targetted; impl PlanBuilderPhase for Targetted {} impl PlanBuilder, NoTarget> { - pub fn new(pd_client: Arc, encoded_request: EncodedRequest) -> Self { + pub fn new(pd_client: Arc, keyspace: Keyspace, mut request: Req) -> Self { + request.set_api_version(keyspace.api_version()); PlanBuilder { pd_client, plan: Dispatch { - request: encoded_request.inner, + request, kv_client: None, }, phantom: PhantomData, @@ -69,7 +70,11 @@ impl PlanBuilder { impl PlanBuilder { /// If there is a lock error, then resolve the lock and retry the request. - pub fn resolve_lock(self, backoff: Backoff) -> PlanBuilder, Ph> + pub fn resolve_lock( + self, + backoff: Backoff, + keyspace: Keyspace, + ) -> PlanBuilder, Ph> where P::Result: HasLocks, { @@ -79,6 +84,7 @@ impl PlanBuilder { inner: self.plan, backoff, pd_client: self.pd_client, + keyspace, }, phantom: PhantomData, } @@ -89,6 +95,7 @@ impl PlanBuilder { ctx: ResolveLocksContext, options: ResolveLocksOptions, backoff: Backoff, + keyspace: Keyspace, ) -> PlanBuilder, Ph> where P: Shardable + NextBatch, @@ -103,6 +110,7 @@ impl PlanBuilder { store: None, backoff, pd_client: self.pd_client, + keyspace, }, phantom: PhantomData, } @@ -248,8 +256,7 @@ fn set_single_region_store( store: RegionStore, pd_client: Arc, ) -> Result, Targetted>> { - plan.request - .set_context(store.region_with_leader.context()?); + plan.request.set_leader(&store.region_with_leader)?; plan.kv_client = Some(store.client); Ok(PlanBuilder { plan, diff --git a/src/request/shard.rs b/src/request/shard.rs index ec234239..1f116f76 100644 --- a/src/request/shard.rs +++ b/src/request/shard.rs @@ -14,6 +14,7 @@ use crate::request::ResolveLock; use crate::store::RegionStore; use crate::store::Request; use crate::Result; +use std::fmt::Debug; macro_rules! impl_inner_shardable { () => { @@ -33,7 +34,7 @@ macro_rules! impl_inner_shardable { } pub trait Shardable { - type Shard: Clone + Send + Sync; + type Shard: Debug + Clone + Send + Sync; fn shards( &self, @@ -164,7 +165,7 @@ macro_rules! shardable_key { mut shard: Self::Shard, store: &$crate::store::RegionStore, ) -> $crate::Result<()> { - self.set_context(store.region_with_leader.context()?); + self.set_leader(&store.region_with_leader)?; assert!(shard.len() == 1); self.key = shard.pop().unwrap(); Ok(()) @@ -197,7 +198,7 @@ macro_rules! shardable_keys { shard: Self::Shard, store: &$crate::store::RegionStore, ) -> $crate::Result<()> { - self.set_context(store.region_with_leader.context()?); + self.set_leader(&store.region_with_leader)?; self.keys = shard.into_iter().map(Into::into).collect(); Ok(()) } @@ -257,12 +258,12 @@ macro_rules! shardable_range { shard: Self::Shard, store: &$crate::store::RegionStore, ) -> $crate::Result<()> { - self.set_context(store.region_with_leader.context()?); + self.set_leader(&store.region_with_leader)?; // In a reverse range request, the range is in the meaning of [end_key, start_key), i.e. end_key <= x < start_key. // As a result, after obtaining start_key and end_key from PD, we need to swap their values when assigning them to the request. - self.start_key = shard.0.into(); - self.end_key = shard.1.into(); + self.start_key = shard.0; + self.end_key = shard.1; if self.is_reverse() { std::mem::swap(&mut self.start_key, &mut self.end_key); } diff --git a/src/store/request.rs b/src/store/request.rs index e11fc8f1..4aed90d6 100644 --- a/src/store/request.rs +++ b/src/store/request.rs @@ -9,6 +9,7 @@ use tonic::IntoRequest; use crate::proto::kvrpcpb; use crate::proto::tikvpb::tikv_client::TikvClient; +use crate::store::RegionWithLeader; use crate::Error; use crate::Result; @@ -21,9 +22,7 @@ pub trait Request: Any + Sync + Send + 'static { ) -> Result>; fn label(&self) -> &'static str; fn as_any(&self) -> &dyn Any; - /// Set the context for the request. - /// Should always use `set_context` other than modify the `self.context` directly. - fn set_context(&mut self, context: kvrpcpb::Context); + fn set_leader(&mut self, leader: &RegionWithLeader) -> Result<()>; fn set_api_version(&mut self, api_version: kvrpcpb::ApiVersion); } @@ -54,19 +53,20 @@ macro_rules! impl_request { self } - fn set_context(&mut self, context: kvrpcpb::Context) { - let api_version = self - .context - .as_ref() - .map(|c| c.api_version) - .unwrap_or_default(); - self.context = Some(context); - self.set_api_version(kvrpcpb::ApiVersion::try_from(api_version).unwrap()); + fn set_leader(&mut self, leader: &RegionWithLeader) -> Result<()> { + let ctx = self.context.get_or_insert(kvrpcpb::Context::default()); + let leader_peer = leader.leader.as_ref().ok_or(Error::LeaderNotFound { + region_id: leader.region.id, + })?; + ctx.region_id = leader.region.id; + ctx.region_epoch = leader.region.region_epoch.clone(); + ctx.peer = Some(leader_peer.clone()); + Ok(()) } fn set_api_version(&mut self, api_version: kvrpcpb::ApiVersion) { - let context = self.context.get_or_insert(kvrpcpb::Context::default()); - context.api_version = api_version.into(); + let ctx = self.context.get_or_insert(kvrpcpb::Context::default()); + ctx.api_version = api_version.into(); } } }; diff --git a/src/transaction/buffer.rs b/src/transaction/buffer.rs index 202b3665..7090ebd4 100644 --- a/src/transaction/buffer.rs +++ b/src/transaction/buffer.rs @@ -12,6 +12,8 @@ use crate::KvPair; use crate::Result; use crate::Value; +use super::transaction::Mutation; + /// A caching layer which buffers reads and writes in a transaction. pub struct Buffer { primary_key: Option, @@ -244,12 +246,10 @@ impl Buffer { } } - pub(crate) fn mutate(&mut self, m: kvrpcpb::Mutation) { - let op = kvrpcpb::Op::try_from(m.op).unwrap(); - match op { - kvrpcpb::Op::Put => self.put(m.key.into(), m.value), - kvrpcpb::Op::Del => self.delete(m.key.into()), - _ => unimplemented!("only put and delete are supported in mutate"), + pub(crate) fn mutate(&mut self, m: Mutation) { + match m { + Mutation::Put(key, value) => self.put(key, value), + Mutation::Delete(key) => self.delete(key), }; } diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 4bcb16d9..eb9cc91b 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -10,8 +10,10 @@ use crate::config::Config; use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::pdpb::Timestamp; -use crate::request::codec::{ApiV1TxnCodec, ApiV2TxnCodec, Codec, EncodedRequest}; use crate::request::plan::CleanupLocksResult; +use crate::request::EncodeKeyspace; +use crate::request::KeyMode; +use crate::request::Keyspace; use crate::request::Plan; use crate::timestamp::TimestampExt; use crate::transaction::lock::ResolveLocksOptions; @@ -44,19 +46,21 @@ const SCAN_LOCK_BATCH_SIZE: u32 = 1024; /// /// The returned results of transactional requests are [`Future`](std::future::Future)s that must be /// awaited to execute. -pub struct Client { - pd: Arc>, +pub struct Client { + pd: Arc, + keyspace: Keyspace, } -impl Clone for Client { +impl Clone for Client { fn clone(&self) -> Self { Self { pd: self.pd.clone(), + keyspace: self.keyspace, } } } -impl Client { +impl Client { /// Create a transactional [`Client`] and connect to the TiKV cluster. /// /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for @@ -73,6 +77,7 @@ impl Client { /// # }); /// ``` pub async fn new>(pd_endpoints: Vec) -> Result { + // debug!("creating transactional client"); Self::new_with_config(pd_endpoints, Config::default()).await } @@ -101,36 +106,17 @@ impl Client { pd_endpoints: Vec, config: Config, ) -> Result { - Self::new_with_codec(pd_endpoints, config, ApiV1TxnCodec::default()).await - } -} - -impl Client { - pub async fn new_with_config_v2>( - _keyspace_name: &str, - pd_endpoints: Vec, - config: Config, - ) -> Result> { - debug!("creating new transactional client APIv2"); - let pd_endpoints: Vec = pd_endpoints.into_iter().map(Into::into).collect(); - let mut pd = PdRpcClient::connect(&pd_endpoints, config, true, None).await?; - let keyspace_id = 0; // TODO: get keyspace_id by pd.get_keyspace(keyspace_name) - pd.set_codec(ApiV2TxnCodec::new(keyspace_id)); - Ok(Client { pd: Arc::new(pd) }) - } -} - -impl Client { - pub async fn new_with_codec>( - pd_endpoints: Vec, - config: Config, - codec: Cod, - ) -> Result> { debug!("creating new transactional client"); let pd_endpoints: Vec = pd_endpoints.into_iter().map(Into::into).collect(); - let pd = - Arc::new(PdRpcClient::::connect(&pd_endpoints, config, true, Some(codec)).await?); - Ok(Client { pd }) + let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config.clone(), true).await?); + let keyspace = match config.keyspace { + Some(keyspace) => { + let keyspace_id = pd.get_keyspace_id(&keyspace).await?; + Keyspace::Enable { keyspace_id } + } + None => Keyspace::Disable, + }; + Ok(Client { pd, keyspace }) } /// Creates a new optimistic [`Transaction`]. @@ -153,7 +139,7 @@ impl Client { /// transaction.commit().await.unwrap(); /// # }); /// ``` - pub async fn begin_optimistic(&self) -> Result>> { + pub async fn begin_optimistic(&self) -> Result { debug!("creating new optimistic transaction"); let timestamp = self.current_timestamp().await?; Ok(self.new_transaction(timestamp, TransactionOptions::new_optimistic())) @@ -176,7 +162,7 @@ impl Client { /// transaction.commit().await.unwrap(); /// # }); /// ``` - pub async fn begin_pessimistic(&self) -> Result>> { + pub async fn begin_pessimistic(&self) -> Result { debug!("creating new pessimistic transaction"); let timestamp = self.current_timestamp().await?; Ok(self.new_transaction(timestamp, TransactionOptions::new_pessimistic())) @@ -199,21 +185,14 @@ impl Client { /// transaction.commit().await.unwrap(); /// # }); /// ``` - pub async fn begin_with_options( - &self, - options: TransactionOptions, - ) -> Result>> { + pub async fn begin_with_options(&self, options: TransactionOptions) -> Result { debug!("creating new customized transaction"); let timestamp = self.current_timestamp().await?; Ok(self.new_transaction(timestamp, options)) } /// Create a new [`Snapshot`](Snapshot) at the given [`Timestamp`](Timestamp). - pub fn snapshot( - &self, - timestamp: Timestamp, - options: TransactionOptions, - ) -> Snapshot> { + pub fn snapshot(&self, timestamp: Timestamp, options: TransactionOptions) -> Snapshot { debug!("creating new snapshot"); Snapshot::new(self.new_transaction(timestamp, options.read_only())) } @@ -279,10 +258,10 @@ impl Client { // scan all locks with ts <= safepoint let ctx = ResolveLocksContext::default(); let backoff = Backoff::equal_jitter_backoff(100, 10000, 50); - let req = new_scan_lock_request(range.into(), safepoint, options.batch_size); - let encoded_req = EncodedRequest::new(req, self.pd.get_codec()); - let plan = crate::request::PlanBuilder::new(self.pd.clone(), encoded_req) - .cleanup_locks(ctx.clone(), options, backoff) + let range = range.into().encode_keyspace(self.keyspace, KeyMode::Txn); + let req = new_scan_lock_request(range, safepoint, options.batch_size); + let plan = crate::request::PlanBuilder::new(self.pd.clone(), self.keyspace, req) + .cleanup_locks(ctx.clone(), options, backoff, self.keyspace) .retry_multi_region(DEFAULT_REGION_BACKOFF) .extract_error() .merge(crate::request::Collect) @@ -299,13 +278,15 @@ impl Client { range: impl Into, batch_size: u32, ) -> Result> { - let req = new_scan_lock_request(range.into(), safepoint, batch_size); - let encoded_req = EncodedRequest::new(req, self.pd.get_codec()); - let plan = crate::request::PlanBuilder::new(self.pd.clone(), encoded_req) + use crate::request::TruncateKeyspace; + + let range = range.into().encode_keyspace(self.keyspace, KeyMode::Txn); + let req = new_scan_lock_request(range, safepoint, batch_size); + let plan = crate::request::PlanBuilder::new(self.pd.clone(), self.keyspace, req) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(crate::request::Collect) .plan(); - plan.execute().await + Ok(plan.execute().await?.truncate_keyspace(self.keyspace)) } /// Cleans up all keys in a range and quickly reclaim disk space. @@ -316,20 +297,16 @@ impl Client { /// /// This interface is intended for special scenarios that resemble operations like "drop table" or "drop database" in TiDB. pub async fn unsafe_destroy_range(&self, range: impl Into) -> Result<()> { - let req = new_unsafe_destroy_range_request(range.into()); - let encoded_req = EncodedRequest::new(req, self.pd.get_codec()); - let plan = crate::request::PlanBuilder::new(self.pd.clone(), encoded_req) + let range = range.into().encode_keyspace(self.keyspace, KeyMode::Txn); + let req = new_unsafe_destroy_range_request(range); + let plan = crate::request::PlanBuilder::new(self.pd.clone(), self.keyspace, req) .all_stores(DEFAULT_STORE_BACKOFF) .merge(crate::request::Collect) .plan(); plan.execute().await } - fn new_transaction( - &self, - timestamp: Timestamp, - options: TransactionOptions, - ) -> Transaction> { - Transaction::new(timestamp, self.pd.clone(), options) + fn new_transaction(&self, timestamp: Timestamp, options: TransactionOptions) -> Transaction { + Transaction::new(timestamp, self.pd.clone(), options, self.keyspace) } } diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index afb1d6c4..efa835d6 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -13,13 +13,14 @@ use crate::backoff::Backoff; use crate::backoff::DEFAULT_REGION_BACKOFF; use crate::backoff::OPTIMISTIC_BACKOFF; use crate::pd::PdClient; + use crate::proto::kvrpcpb; use crate::proto::kvrpcpb::TxnInfo; use crate::proto::pdpb::Timestamp; use crate::region::RegionVerId; -use crate::request::codec::EncodedRequest; use crate::request::Collect; use crate::request::CollectSingle; +use crate::request::Keyspace; use crate::request::Plan; use crate::store::RegionStore; use crate::timestamp::TimestampExt; @@ -44,6 +45,7 @@ const RESOLVE_LOCK_RETRY_LIMIT: usize = 10; pub async fn resolve_locks( locks: Vec, pd_client: Arc, + keyspace: Keyspace, ) -> Result /* live_locks */> { debug!("resolving locks"); let ts = pd_client.clone().get_timestamp().await?; @@ -76,9 +78,8 @@ pub async fn resolve_locks( Some(&commit_version) => commit_version, None => { let request = requests::new_cleanup_request(lock.primary_lock, lock.lock_version); - let encoded_req = EncodedRequest::new(request, pd_client.get_codec()); - let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) - .resolve_lock(OPTIMISTIC_BACKOFF) + let plan = crate::request::PlanBuilder::new(pd_client.clone(), keyspace, request) + .resolve_lock(OPTIMISTIC_BACKOFF, keyspace) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(CollectSingle) .post_process_default() @@ -94,6 +95,7 @@ pub async fn resolve_locks( lock.lock_version, commit_version, pd_client.clone(), + keyspace, ) .await?; clean_regions @@ -109,6 +111,7 @@ async fn resolve_lock_with_retry( start_version: u64, commit_version: u64, pd_client: Arc, + keyspace: Keyspace, ) -> Result { debug!("resolving locks with retry"); // FIXME: Add backoff @@ -118,11 +121,10 @@ async fn resolve_lock_with_retry( let store = pd_client.clone().store_for_key(key.into()).await?; let ver_id = store.region_with_leader.ver_id(); let request = requests::new_resolve_lock_request(start_version, commit_version); - let encoded_req = EncodedRequest::new(request, pd_client.get_codec()); - let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) + let plan = crate::request::PlanBuilder::new(pd_client.clone(), keyspace, request) .single_region_with_store(store) .await? - .resolve_lock(Backoff::no_backoff()) + .resolve_lock(Backoff::no_backoff(), keyspace) .extract_error() .plan(); match plan.execute().await { @@ -214,6 +216,7 @@ impl LockResolver { store: RegionStore, locks: Vec, pd_client: Arc, // TODO: make pd_client a member of LockResolver + keyspace: Keyspace, ) -> Result<()> { if locks.is_empty() { return Ok(()); @@ -235,6 +238,7 @@ impl LockResolver { let mut status = self .check_txn_status( pd_client.clone(), + keyspace, txn_id, l.primary_lock.clone(), 0, @@ -249,7 +253,12 @@ impl LockResolver { // Then we need to check the secondary locks to determine the final status of the transaction. if let TransactionStatusKind::Locked(_, lock_info) = &status.kind { let secondary_status = self - .check_all_secondaries(pd_client.clone(), lock_info.secondaries.clone(), txn_id) + .check_all_secondaries( + pd_client.clone(), + keyspace, + lock_info.secondaries.clone(), + txn_id, + ) .await?; debug!( "secondary status, txn_id:{}, commit_ts:{:?}, min_commit_version:{}, fallback_2pc:{}", @@ -267,6 +276,7 @@ impl LockResolver { status = self .check_txn_status( pd_client.clone(), + keyspace, txn_id, l.primary_lock, 0, @@ -315,7 +325,7 @@ impl LockResolver { txn_info_vec.push(txn_info); } let cleaned_region = self - .batch_resolve_locks(pd_client.clone(), store.clone(), txn_info_vec) + .batch_resolve_locks(pd_client.clone(), keyspace, store.clone(), txn_info_vec) .await?; for txn_id in txn_ids { self.ctx @@ -330,6 +340,7 @@ impl LockResolver { pub async fn check_txn_status( &mut self, pd_client: Arc, + keyspace: Keyspace, txn_id: u64, primary: Vec, caller_start_ts: u64, @@ -359,8 +370,7 @@ impl LockResolver { force_sync_commit, resolving_pessimistic_lock, ); - let encoded_req = EncodedRequest::new(req, pd_client.get_codec()); - let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) + let plan = crate::request::PlanBuilder::new(pd_client.clone(), keyspace, req) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(CollectSingle) .extract_error() @@ -380,12 +390,12 @@ impl LockResolver { async fn check_all_secondaries( &mut self, pd_client: Arc, + keyspace: Keyspace, keys: Vec>, txn_id: u64, ) -> Result { let req = new_check_secondary_locks_request(keys, txn_id); - let encoded_req = EncodedRequest::new(req, pd_client.get_codec()); - let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) + let plan = crate::request::PlanBuilder::new(pd_client.clone(), keyspace, req) .retry_multi_region(DEFAULT_REGION_BACKOFF) .extract_error() .merge(Collect) @@ -396,13 +406,13 @@ impl LockResolver { async fn batch_resolve_locks( &mut self, pd_client: Arc, + keyspace: Keyspace, store: RegionStore, txn_infos: Vec, ) -> Result { let ver_id = store.region_with_leader.ver_id(); let request = requests::new_batch_resolve_lock_request(txn_infos.clone()); - let encoded_req = EncodedRequest::new(request, pd_client.get_codec()); - let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) + let plan = crate::request::PlanBuilder::new(pd_client.clone(), keyspace, request) .single_region_with_store(store.clone()) .await? .extract_error() @@ -422,13 +432,19 @@ pub trait HasLocks { mod tests { use std::any::Any; + use serial_test::serial; + use super::*; use crate::mock::MockKvClient; use crate::mock::MockPdClient; use crate::proto::errorpb; + #[rstest::rstest] + #[case(Keyspace::Disable)] + #[case(Keyspace::Enable { keyspace_id: 0 })] #[tokio::test] - async fn test_resolve_lock_with_retry() { + #[serial] + async fn test_resolve_lock_with_retry(#[case] keyspace: Keyspace) { // Test resolve lock within retry limit fail::cfg("region-error", "9*return").unwrap(); @@ -447,7 +463,7 @@ mod tests { let key = vec![1]; let region1 = MockPdClient::region1(); - let resolved_region = resolve_lock_with_retry(&key, 1, 2, client.clone()) + let resolved_region = resolve_lock_with_retry(&key, 1, 2, client.clone(), keyspace) .await .unwrap(); assert_eq!(region1.ver_id(), resolved_region); @@ -455,7 +471,7 @@ mod tests { // Test resolve lock over retry limit fail::cfg("region-error", "10*return").unwrap(); let key = vec![100]; - resolve_lock_with_retry(&key, 3, 4, client) + resolve_lock_with_retry(&key, 3, 4, client, keyspace) .await .expect_err("should return error"); } diff --git a/src/transaction/mod.rs b/src/transaction/mod.rs index 81a290fa..5bc8f0e4 100644 --- a/src/transaction/mod.rs +++ b/src/transaction/mod.rs @@ -15,6 +15,7 @@ pub use snapshot::Snapshot; pub use transaction::CheckLevel; #[doc(hidden)] pub use transaction::HeartbeatOption; +pub use transaction::Mutation; pub use transaction::Transaction; pub use transaction::TransactionOptions; diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 4f3e1b93..dfada4d7 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -298,7 +298,7 @@ impl Shardable for kvrpcpb::PrewriteRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_context(store.region_with_leader.context()?); + self.set_leader(&store.region_with_leader)?; // Only need to set secondary keys if we're sending the primary key. if self.use_async_commit && !self.mutations.iter().any(|m| m.key == self.primary_lock) { @@ -365,7 +365,7 @@ impl Shardable for kvrpcpb::CommitRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_context(store.region_with_leader.context()?); + self.set_leader(&store.region_with_leader)?; self.keys = shard.into_iter().map(Into::into).collect(); Ok(()) } @@ -456,7 +456,7 @@ impl Shardable for kvrpcpb::PessimisticLockRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_context(store.region_with_leader.context()?); + self.set_leader(&store.region_with_leader)?; self.mutations = shard; Ok(()) } @@ -557,7 +557,7 @@ impl Shardable for kvrpcpb::ScanLockRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_context(store.region_with_leader.context()?); + self.set_leader(&store.region_with_leader)?; self.start_key = shard.0; Ok(()) } @@ -618,7 +618,7 @@ impl Shardable for kvrpcpb::TxnHeartBeatRequest { } fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_context(store.region_with_leader.context()?); + self.set_leader(&store.region_with_leader)?; assert!(shard.len() == 1); self.primary_lock = shard.pop().unwrap(); Ok(()) @@ -676,7 +676,7 @@ impl Shardable for kvrpcpb::CheckTxnStatusRequest { } fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_context(store.region_with_leader.context()?); + self.set_leader(&store.region_with_leader)?; assert!(shard.len() == 1); self.primary_key = shard.pop().unwrap(); Ok(()) diff --git a/src/transaction/snapshot.rs b/src/transaction/snapshot.rs index 0d1e4803..5694614b 100644 --- a/src/transaction/snapshot.rs +++ b/src/transaction/snapshot.rs @@ -2,11 +2,7 @@ use derive_new::new; use log::debug; -use std::marker::PhantomData; -use crate::codec::ApiV1TxnCodec; -use crate::pd::{PdClient, PdRpcClient}; -use crate::request::codec::Codec; use crate::BoundRange; use crate::Key; use crate::KvPair; @@ -22,12 +18,11 @@ use crate::Value; /// /// See the [Transaction](struct@crate::Transaction) docs for more information on the methods. #[derive(new)] -pub struct Snapshot = PdRpcClient> { - transaction: Transaction, - phantom: PhantomData, +pub struct Snapshot { + transaction: Transaction, } -impl> Snapshot { +impl Snapshot { /// Get the value associated with the given key. pub async fn get(&mut self, key: impl Into) -> Result> { debug!("invoking get request on snapshot"); diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index e984f153..cae46179 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -1,7 +1,6 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use std::iter; -use std::marker::PhantomData; use std::sync::atomic; use std::sync::atomic::AtomicU8; use std::sync::Arc; @@ -16,19 +15,21 @@ use tokio::time::Duration; use crate::backoff::Backoff; use crate::backoff::DEFAULT_REGION_BACKOFF; -use crate::codec::ApiV1TxnCodec; use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::kvrpcpb; use crate::proto::pdpb::Timestamp; -use crate::request::codec::{Codec, EncodedRequest}; use crate::request::Collect; use crate::request::CollectError; use crate::request::CollectSingle; use crate::request::CollectWithShard; +use crate::request::EncodeKeyspace; +use crate::request::KeyMode; +use crate::request::Keyspace; use crate::request::Plan; use crate::request::PlanBuilder; use crate::request::RetryOptions; +use crate::request::TruncateKeyspace; use crate::timestamp::TimestampExt; use crate::transaction::buffer::Buffer; use crate::transaction::lowering::*; @@ -77,23 +78,24 @@ use crate::Value; /// txn.commit().await.unwrap(); /// # }); /// ``` -pub struct Transaction = PdRpcClient> { +pub struct Transaction { status: Arc, timestamp: Timestamp, buffer: Buffer, rpc: Arc, options: TransactionOptions, + keyspace: Keyspace, is_heartbeat_started: bool, start_instant: Instant, - phantom: PhantomData, } -impl> Transaction { +impl Transaction { pub(crate) fn new( timestamp: Timestamp, rpc: Arc, options: TransactionOptions, - ) -> Transaction { + keyspace: Keyspace, + ) -> Transaction { let status = if options.read_only { TransactionStatus::ReadOnly } else { @@ -105,9 +107,9 @@ impl> Transaction { buffer: Buffer::new(options.is_pessimistic()), rpc, options, + keyspace, is_heartbeat_started: false, start_instant: std::time::Instant::now(), - phantom: PhantomData, } } @@ -134,15 +136,15 @@ impl> Transaction { self.check_allow_operation().await?; let timestamp = self.timestamp.clone(); let rpc = self.rpc.clone(); - let key = key.into(); + let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn); let retry_options = self.options.retry_options.clone(); + let keyspace = self.keyspace; self.buffer .get_or_else(key, |key| async move { let request = new_get_request(key, timestamp); - let encoded_req = EncodedRequest::new(request, rpc.get_codec()); - let plan = PlanBuilder::new(rpc, encoded_req) - .resolve_lock(retry_options.lock_backoff) + let plan = PlanBuilder::new(rpc, keyspace, request) + .resolve_lock(retry_options.lock_backoff, keyspace) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(CollectSingle) .post_process_default() @@ -202,7 +204,8 @@ impl> Transaction { self.lock_keys(iter::once(key.clone())).await?; self.get(key).await } else { - let mut pairs = self.pessimistic_lock(iter::once(key.into()), true).await?; + let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn); + let mut pairs = self.pessimistic_lock(iter::once(key), true).await?; debug_assert!(pairs.len() <= 1); match pairs.pop() { Some(pair) => Ok(Some(pair.1)), @@ -266,14 +269,17 @@ impl> Transaction { self.check_allow_operation().await?; let timestamp = self.timestamp.clone(); let rpc = self.rpc.clone(); + let keyspace = self.keyspace; + let keys = keys + .into_iter() + .map(move |k| k.into().encode_keyspace(keyspace, KeyMode::Txn)); let retry_options = self.options.retry_options.clone(); self.buffer - .batch_get_or_else(keys.into_iter().map(|k| k.into()), move |keys| async move { + .batch_get_or_else(keys, move |keys| async move { let request = new_batch_get_request(keys, timestamp); - let encoded_req = EncodedRequest::new(request, rpc.get_codec()); - let plan = PlanBuilder::new(rpc, encoded_req) - .resolve_lock(retry_options.lock_backoff) + let plan = PlanBuilder::new(rpc, keyspace, request) + .resolve_lock(retry_options.lock_backoff, keyspace) .retry_multi_region(retry_options.region_backoff) .merge(Collect) .plan(); @@ -282,6 +288,7 @@ impl> Transaction { .map(|r| r.into_iter().map(Into::into).collect()) }) .await + .map(move |pairs| pairs.map(move |pair| pair.truncate_keyspace(keyspace))) } /// Create a new 'batch get for update' request. @@ -317,12 +324,20 @@ impl> Transaction { ) -> Result> { debug!("invoking transactional batch_get_for_update request"); self.check_allow_operation().await?; - let keys: Vec = keys.into_iter().map(|k| k.into()).collect(); if !self.is_pessimistic() { + let keys: Vec = keys.into_iter().map(|k| k.into()).collect(); self.lock_keys(keys.clone()).await?; Ok(self.batch_get(keys).await?.collect()) } else { - self.pessimistic_lock(keys, true).await + let keyspace = self.keyspace; + let keys = keys + .into_iter() + .map(move |k| k.into().encode_keyspace(keyspace, KeyMode::Txn)); + let pairs = self + .pessimistic_lock(keys, true) + .await? + .truncate_keyspace(keyspace); + Ok(pairs) } } @@ -448,7 +463,7 @@ impl> Transaction { pub async fn put(&mut self, key: impl Into, value: impl Into) -> Result<()> { debug!("invoking transactional put request"); self.check_allow_operation().await?; - let key = key.into(); + let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn); if self.is_pessimistic() { self.pessimistic_lock(iter::once(key.clone()), false) .await?; @@ -479,7 +494,7 @@ impl> Transaction { pub async fn insert(&mut self, key: impl Into, value: impl Into) -> Result<()> { debug!("invoking transactional insert request"); self.check_allow_operation().await?; - let key = key.into(); + let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn); if self.buffer.get(&key).is_some() { return Err(Error::DuplicateKeyInsertion); } @@ -514,7 +529,7 @@ impl> Transaction { pub async fn delete(&mut self, key: impl Into) -> Result<()> { debug!("invoking transactional delete request"); self.check_allow_operation().await?; - let key = key.into(); + let key = key.into().encode_keyspace(self.keyspace, KeyMode::Txn); if self.is_pessimistic() { self.pessimistic_lock(iter::once(key.clone()), false) .await?; @@ -530,23 +545,14 @@ impl> Transaction { /// # Examples /// /// ```rust,no_run - /// # use tikv_client::{Key, Config, TransactionClient, proto::kvrpcpb}; + /// # use tikv_client::{Key, Config, TransactionClient, transaction::Mutation}; /// # use futures::prelude::*; /// # futures::executor::block_on(async { /// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap(); /// let mut txn = client.begin_optimistic().await.unwrap(); /// let mutations = vec![ - /// kvrpcpb::Mutation { - /// op: kvrpcpb::Op::Del.into(), - /// key: b"k0".to_vec(), - /// ..Default::default() - /// }, - /// kvrpcpb::Mutation { - /// op: kvrpcpb::Op::Put.into(), - /// key: b"k1".to_vec(), - /// value: b"v1".to_vec(), - /// ..Default::default() - /// }, + /// Mutation::Delete("k0".to_owned().into()), + /// Mutation::Put("k1".to_owned().into(), b"v1".to_vec()), /// ]; /// txn.batch_mutate(mutations).await.unwrap(); /// txn.commit().await.unwrap(); @@ -554,13 +560,16 @@ impl> Transaction { /// ``` pub async fn batch_mutate( &mut self, - mutations: impl IntoIterator, + mutations: impl IntoIterator, ) -> Result<()> { debug!("invoking transactional batch mutate request"); self.check_allow_operation().await?; + let mutations: Vec = mutations + .into_iter() + .map(|mutation| mutation.encode_keyspace(self.keyspace, KeyMode::Txn)) + .collect(); if self.is_pessimistic() { - let mutations: Vec = mutations.into_iter().collect(); - self.pessimistic_lock(mutations.iter().map(|m| Key::from(m.key.clone())), false) + self.pessimistic_lock(mutations.iter().map(|m| m.key().clone()), false) .await?; for m in mutations { self.buffer.mutate(m); @@ -602,15 +611,18 @@ impl> Transaction { ) -> Result<()> { debug!("invoking transactional lock_keys request"); self.check_allow_operation().await?; + let keyspace = self.keyspace; + let keys = keys + .into_iter() + .map(move |k| k.into().encode_keyspace(keyspace, KeyMode::Txn)); match self.options.kind { TransactionKind::Optimistic => { for key in keys { - self.buffer.lock(key.into()); + self.buffer.lock(key); } } TransactionKind::Pessimistic(_) => { - self.pessimistic_lock(keys.into_iter().map(|k| k.into()), false) - .await?; + self.pessimistic_lock(keys, false).await?; } } Ok(()) @@ -660,6 +672,7 @@ impl> Transaction { self.timestamp.clone(), self.rpc.clone(), self.options.clone(), + self.keyspace, self.buffer.get_write_size() as u64, self.start_instant, ) @@ -712,6 +725,7 @@ impl> Transaction { self.timestamp.clone(), self.rpc.clone(), self.options.clone(), + self.keyspace, self.buffer.get_write_size() as u64, self.start_instant, ) @@ -745,10 +759,13 @@ impl> Transaction { primary_key, self.start_instant.elapsed().as_millis() as u64 + MAX_TTL, ); - let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); - let plan = PlanBuilder::new(self.rpc.clone(), encoded_req) - .resolve_lock(self.options.retry_options.lock_backoff.clone()) + let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, request) + .resolve_lock( + self.options.retry_options.lock_backoff.clone(), + self.keyspace, + ) .retry_multi_region(self.options.retry_options.region_backoff.clone()) + .extract_error() .merge(CollectSingle) .post_process_default() .plan(); @@ -766,19 +783,20 @@ impl> Transaction { let timestamp = self.timestamp.clone(); let rpc = self.rpc.clone(); let retry_options = self.options.retry_options.clone(); + let keyspace = self.keyspace; + let range = range.into().encode_keyspace(self.keyspace, KeyMode::Txn); self.buffer .scan_and_fetch( - range.into(), + range, limit, !key_only, reverse, move |new_range, new_limit| async move { let request = new_scan_request(new_range, timestamp, new_limit, key_only, reverse); - let encoded_req = EncodedRequest::new(request, rpc.get_codec()); - let plan = PlanBuilder::new(rpc, encoded_req) - .resolve_lock(retry_options.lock_backoff) + let plan = PlanBuilder::new(rpc, keyspace, request) + .resolve_lock(retry_options.lock_backoff, keyspace) .retry_multi_region(retry_options.region_backoff) .merge(Collect) .plan(); @@ -788,6 +806,7 @@ impl> Transaction { }, ) .await + .map(move |pairs| pairs.map(move |pair| pair.truncate_keyspace(keyspace))) } /// Pessimistically lock the keys, and optionally retrieve corresponding values. @@ -832,9 +851,11 @@ impl> Transaction { for_update_ts.clone(), need_value, ); - let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); - let plan = PlanBuilder::new(self.rpc.clone(), encoded_req) - .resolve_lock(self.options.retry_options.lock_backoff.clone()) + let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, request) + .resolve_lock( + self.options.retry_options.lock_backoff.clone(), + self.keyspace, + ) .preserve_shard() .retry_multi_region_preserve_results(self.options.retry_options.region_backoff.clone()) .merge(CollectWithShard) @@ -887,9 +908,11 @@ impl> Transaction { start_version, for_update_ts, ); - let encoded_req = EncodedRequest::new(req, self.rpc.get_codec()); - let plan = PlanBuilder::new(self.rpc.clone(), encoded_req) - .resolve_lock(self.options.retry_options.lock_backoff.clone()) + let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, req) + .resolve_lock( + self.options.retry_options.lock_backoff.clone(), + self.keyspace, + ) .retry_multi_region(self.options.retry_options.region_backoff.clone()) .extract_error() .plan(); @@ -937,6 +960,7 @@ impl> Transaction { HeartbeatOption::FixedTime(heartbeat_interval) => heartbeat_interval, }; let start_instant = self.start_instant; + let keyspace = self.keyspace; let heartbeat_task = async move { loop { @@ -957,8 +981,7 @@ impl> Transaction { primary_key.clone(), start_instant.elapsed().as_millis() as u64 + MAX_TTL, ); - let encoded_req = EncodedRequest::new(request, rpc.get_codec()); - let plan = PlanBuilder::new(rpc.clone(), encoded_req) + let plan = PlanBuilder::new(rpc.clone(), keyspace, request) .retry_multi_region(region_backoff.clone()) .merge(CollectSingle) .plan(); @@ -1005,7 +1028,7 @@ impl> Transaction { } } -impl> Drop for Transaction { +impl Drop for Transaction { fn drop(&mut self) { debug!("dropping transaction"); if std::thread::panicking() { @@ -1204,6 +1227,21 @@ impl HeartbeatOption { } } +#[derive(Clone, Eq, PartialEq, Debug)] +pub enum Mutation { + Put(Key, Value), + Delete(Key), +} + +impl Mutation { + pub fn key(&self) -> &Key { + match self { + Mutation::Put(key, _) => key, + Mutation::Delete(key) => key, + } + } +} + /// A struct wrapping the details of two-phase commit protocol (2PC). /// /// The two phases are `prewrite` and `commit`. @@ -1219,6 +1257,7 @@ struct Committer { start_version: Timestamp, rpc: Arc, options: TransactionOptions, + keyspace: Keyspace, #[new(default)] undetermined: bool, write_size: u64, @@ -1296,9 +1335,11 @@ impl Committer { .collect(); // FIXME set max_commit_ts and min_commit_ts - let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); - let plan = PlanBuilder::new(self.rpc.clone(), encoded_req) - .resolve_lock(self.options.retry_options.lock_backoff.clone()) + let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, request) + .resolve_lock( + self.options.retry_options.lock_backoff.clone(), + self.keyspace, + ) .retry_multi_region(self.options.retry_options.region_backoff.clone()) .merge(CollectError) .extract_error() @@ -1337,9 +1378,11 @@ impl Committer { self.start_version.clone(), commit_version.clone(), ); - let encoded_req = EncodedRequest::new(req, self.rpc.get_codec()); - let plan = PlanBuilder::new(self.rpc.clone(), encoded_req) - .resolve_lock(self.options.retry_options.lock_backoff.clone()) + let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, req) + .resolve_lock( + self.options.retry_options.lock_backoff.clone(), + self.keyspace, + ) .retry_multi_region(self.options.retry_options.region_backoff.clone()) .extract_error() .plan(); @@ -1402,9 +1445,8 @@ impl Committer { .filter(|key| &primary_key != key); new_commit_request(keys, self.start_version, commit_version) }; - let encoded_req = EncodedRequest::new(req, self.rpc.get_codec()); - let plan = PlanBuilder::new(self.rpc, encoded_req) - .resolve_lock(self.options.retry_options.lock_backoff) + let plan = PlanBuilder::new(self.rpc, self.keyspace, req) + .resolve_lock(self.options.retry_options.lock_backoff, self.keyspace) .retry_multi_region(self.options.retry_options.region_backoff) .extract_error() .plan(); @@ -1424,9 +1466,8 @@ impl Committer { match self.options.kind { TransactionKind::Optimistic => { let req = new_batch_rollback_request(keys, self.start_version); - let encoded_req = EncodedRequest::new(req, self.rpc.get_codec()); - let plan = PlanBuilder::new(self.rpc, encoded_req) - .resolve_lock(self.options.retry_options.lock_backoff) + let plan = PlanBuilder::new(self.rpc, self.keyspace, req) + .resolve_lock(self.options.retry_options.lock_backoff, self.keyspace) .retry_multi_region(self.options.retry_options.region_backoff) .extract_error() .plan(); @@ -1434,9 +1475,8 @@ impl Committer { } TransactionKind::Pessimistic(for_update_ts) => { let req = new_pessimistic_rollback_request(keys, self.start_version, for_update_ts); - let encoded_req = EncodedRequest::new(req, self.rpc.get_codec()); - let plan = PlanBuilder::new(self.rpc, encoded_req) - .resolve_lock(self.options.retry_options.lock_backoff) + let plan = PlanBuilder::new(self.rpc, self.keyspace, req) + .resolve_lock(self.options.retry_options.lock_backoff, self.keyspace) .retry_multi_region(self.options.retry_options.region_backoff) .extract_error() .plan(); @@ -1506,12 +1546,16 @@ mod tests { use crate::mock::MockPdClient; use crate::proto::kvrpcpb; use crate::proto::pdpb::Timestamp; + use crate::request::Keyspace; use crate::transaction::HeartbeatOption; use crate::Transaction; use crate::TransactionOptions; + #[rstest::rstest] + #[case(Keyspace::Disable)] + #[case(Keyspace::Enable { keyspace_id: 0 })] #[tokio::test] - async fn test_optimistic_heartbeat() -> Result<(), io::Error> { + async fn test_optimistic_heartbeat(#[case] keyspace: Keyspace) -> Result<(), io::Error> { let scenario = FailScenario::setup(); fail::cfg("after-prewrite", "sleep(1500)").unwrap(); let heartbeats = Arc::new(AtomicUsize::new(0)); @@ -1534,6 +1578,7 @@ mod tests { pd_client, TransactionOptions::new_optimistic() .heartbeat_option(HeartbeatOption::FixedTime(Duration::from_secs(1))), + keyspace, ); heartbeat_txn.put(key1.clone(), "foo").await.unwrap(); let heartbeat_txn_handle = tokio::task::spawn_blocking(move || { @@ -1546,8 +1591,11 @@ mod tests { Ok(()) } + #[rstest::rstest] + #[case(Keyspace::Disable)] + #[case(Keyspace::Enable { keyspace_id: 0 })] #[tokio::test] - async fn test_pessimistic_heartbeat() -> Result<(), io::Error> { + async fn test_pessimistic_heartbeat(#[case] keyspace: Keyspace) -> Result<(), io::Error> { let heartbeats = Arc::new(AtomicUsize::new(0)); let heartbeats_cloned = heartbeats.clone(); let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook( @@ -1573,6 +1621,7 @@ mod tests { pd_client, TransactionOptions::new_pessimistic() .heartbeat_option(HeartbeatOption::FixedTime(Duration::from_secs(1))), + keyspace, ); heartbeat_txn.put(key1.clone(), "foo").await.unwrap(); assert_eq!(heartbeats.load(Ordering::SeqCst), 0); diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 4d63dd56..9a32619b 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -2,20 +2,20 @@ mod ctl; +use log::info; +use log::warn; +use rand::Rng; use std::collections::HashSet; use std::convert::TryInto; use std::env; use std::time::Duration; - -use log::info; -use log::warn; -use rand::Rng; +use tikv_client::Config; use tikv_client::Key; use tikv_client::RawClient; use tikv_client::Result; use tikv_client::Transaction; use tikv_client::TransactionClient; -use tikv_client::{ColumnFamily, Snapshot, TransactionOptions}; +use tikv_client::{Snapshot, TransactionOptions}; use tokio::time::sleep; const ENV_PD_ADDRS: &str = "PD_ADDRS"; @@ -24,21 +24,23 @@ const REGION_SPLIT_TIME_LIMIT: Duration = Duration::from_secs(15); // Delete all entries in TiKV to leave a clean space for following tests. pub async fn clear_tikv() { - let cfs = vec![ - ColumnFamily::Default, - ColumnFamily::Lock, - ColumnFamily::Write, - ]; // DEFAULT_REGION_BACKOFF is not long enough for CI environment. So set a longer backoff. let backoff = tikv_client::Backoff::no_jitter_backoff(100, 30000, 20); - for cf in cfs { - let raw_client = RawClient::new(pd_addrs()).await.unwrap().with_cf(cf); - raw_client - .with_backoff(backoff.clone()) - .delete_range(vec![]..) + let raw_client = + RawClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) .await .unwrap(); - } + raw_client + .with_backoff(backoff) + .delete_range(..) + .await + .unwrap(); + + let txn_client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await + .unwrap(); + txn_client.unsafe_destroy_range(..).await.unwrap(); } // To test with multiple regions, prewrite some data. Tests that hope to test @@ -78,14 +80,16 @@ async fn ensure_region_split( // 1. write plenty transactional keys // 2. wait until regions split - let client = TransactionClient::new(pd_addrs()).await?; + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; let mut txn = client.begin_optimistic().await?; for key in keys.into_iter() { txn.put(key.into(), vec![0, 0, 0, 0]).await?; } txn.commit().await?; let mut txn = client.begin_optimistic().await?; - let _ = txn.scan(vec![].., 2048).await?; + let _ = txn.scan(.., 2048).await?; txn.commit().await?; info!("splitting regions..."); diff --git a/tests/failpoint_tests.rs b/tests/failpoint_tests.rs index f34dff48..d58b4016 100644 --- a/tests/failpoint_tests.rs +++ b/tests/failpoint_tests.rs @@ -17,6 +17,7 @@ use tikv_client::transaction::HeartbeatOption; use tikv_client::transaction::ResolveLocksOptions; use tikv_client::Backoff; use tikv_client::CheckLevel; +use tikv_client::Config; use tikv_client::Result; use tikv_client::RetryOptions; use tikv_client::TransactionClient; @@ -34,7 +35,9 @@ async fn txn_optimistic_heartbeat() -> Result<()> { let key1 = "key1".to_owned(); let key2 = "key2".to_owned(); - let client = TransactionClient::new(pd_addrs()).await?; + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; // CheckLevel::Panic makes the case unstable, change to Warn level for now. // See https://github.com/tikv/client-rust/issues/389 @@ -110,7 +113,9 @@ async fn txn_cleanup_locks_batch_size() -> Result<()> { fail::cfg("before-cleanup-locks", "off").unwrap(); }} - let client = TransactionClient::new(pd_addrs()).await?; + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; let keys = write_data(&client, true, true).await?; assert_eq!(count_locks(&client).await?, keys.len()); @@ -145,7 +150,11 @@ async fn txn_cleanup_async_commit_locks() -> Result<()> { fail::cfg("after-prewrite", "off").unwrap() } - let client = TransactionClient::new(pd_addrs()).await?; + let client = TransactionClient::new_with_config( + pd_addrs(), + Config::default().with_default_keyspace(), + ) + .await?; let keys = write_data(&client, true, true).await?; assert_eq!(count_locks(&client).await?, keys.len()); @@ -171,7 +180,11 @@ async fn txn_cleanup_async_commit_locks() -> Result<()> { fail::cfg("before-commit-secondary", "off").unwrap() } - let client = TransactionClient::new(pd_addrs()).await?; + let client = TransactionClient::new_with_config( + pd_addrs(), + Config::default().with_default_keyspace(), + ) + .await?; let keys = write_data(&client, true, false).await?; thread::sleep(Duration::from_secs(1)); // Wait for async commit to complete. assert_eq!(count_locks(&client).await?, keys.len() * percent / 100); @@ -192,7 +205,11 @@ async fn txn_cleanup_async_commit_locks() -> Result<()> { // all committed { info!("test all committed"); - let client = TransactionClient::new(pd_addrs()).await?; + let client = TransactionClient::new_with_config( + pd_addrs(), + Config::default().with_default_keyspace(), + ) + .await?; let keys = write_data(&client, true, false).await?; let safepoint = client.current_timestamp().await?; @@ -227,7 +244,9 @@ async fn txn_cleanup_range_async_commit_locks() -> Result<()> { fail::cfg("after-prewrite", "off").unwrap() } - let client = TransactionClient::new(pd_addrs()).await?; + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; let keys = write_data(&client, true, true).await?; assert_eq!(count_locks(&client).await?, keys.len()); @@ -276,7 +295,11 @@ async fn txn_cleanup_2pc_locks() -> Result<()> { fail::cfg("after-prewrite", "off").unwrap() } - let client = TransactionClient::new(pd_addrs()).await?; + let client = TransactionClient::new_with_config( + pd_addrs(), + Config::default().with_default_keyspace(), + ) + .await?; let keys = write_data(&client, false, true).await?; assert_eq!(count_locks(&client).await?, keys.len()); @@ -306,7 +329,11 @@ async fn txn_cleanup_2pc_locks() -> Result<()> { // all committed { info!("test all committed"); - let client = TransactionClient::new(pd_addrs()).await?; + let client = TransactionClient::new_with_config( + pd_addrs(), + Config::default().with_default_keyspace(), + ) + .await?; let keys = write_data(&client, false, false).await?; assert_eq!(count_locks(&client).await?, 0); @@ -347,7 +374,7 @@ async fn must_rollbacked(client: &TransactionClient, keys: HashSet>) { async fn count_locks(client: &TransactionClient) -> Result { let ts = client.current_timestamp().await.unwrap(); - let locks = client.scan_locks(&ts, vec![].., 1024).await?; + let locks = client.scan_locks(&ts, .., 1024).await?; // De-duplicated as `scan_locks` will return duplicated locks due to retry on region changes. let locks_set: HashSet> = HashSet::from_iter(locks.into_iter().map(|l| l.key)); Ok(locks_set.len()) diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 82442c4b..0f3bb0cf 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -12,18 +12,18 @@ //! requirements on the region boundaries. mod common; -use std::collections::HashMap; -use std::iter; - use common::*; use futures::prelude::*; use rand::seq::IteratorRandom; use rand::thread_rng; use rand::Rng; use serial_test::serial; +use std::collections::HashMap; +use std::iter; use tikv_client::backoff::DEFAULT_REGION_BACKOFF; -use tikv_client::proto::kvrpcpb; use tikv_client::transaction::HeartbeatOption; +use tikv_client::transaction::Mutation; +use tikv_client::Config; use tikv_client::Error; use tikv_client::Key; use tikv_client::KvPair; @@ -42,7 +42,9 @@ const NUM_TRNASFER: u32 = 100; #[serial] async fn txn_get_timestamp() -> Result<()> { const COUNT: usize = 1 << 8; // use a small number to make test fast - let client = TransactionClient::new(pd_addrs()).await?; + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; let mut versions = future::join_all((0..COUNT).map(|_| client.current_timestamp())) .await @@ -63,7 +65,9 @@ async fn txn_get_timestamp() -> Result<()> { async fn txn_crud() -> Result<()> { init().await?; - let client = TransactionClient::new(pd_addrs()).await?; + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; let mut txn = client.begin_optimistic().await?; // Get non-existent keys @@ -147,7 +151,9 @@ async fn txn_crud() -> Result<()> { async fn txn_insert_duplicate_keys() -> Result<()> { init().await?; - let client = TransactionClient::new(pd_addrs()).await?; + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; // Initialize TiKV store with {foo => bar} let mut txn = client.begin_optimistic().await?; txn.put("foo".to_owned(), "bar".to_owned()).await?; @@ -171,7 +177,9 @@ async fn txn_insert_duplicate_keys() -> Result<()> { async fn txn_pessimistic() -> Result<()> { init().await?; - let client = TransactionClient::new(pd_addrs()).await?; + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; let mut txn = client.begin_pessimistic().await?; txn.put("foo".to_owned(), "foo".to_owned()).await.unwrap(); @@ -188,7 +196,9 @@ async fn txn_pessimistic() -> Result<()> { async fn txn_split_batch() -> Result<()> { init().await?; - let client = TransactionClient::new(pd_addrs()).await?; + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; let mut txn = client.begin_optimistic().await?; let mut rng = thread_rng(); @@ -226,7 +236,8 @@ async fn txn_split_batch() -> Result<()> { #[serial] async fn raw_bank_transfer() -> Result<()> { init().await?; - let client = RawClient::new(pd_addrs()).await?; + let client = + RawClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()).await?; let mut rng = thread_rng(); let people = gen_u32_keys(NUM_PEOPLE, &mut rng); @@ -278,7 +289,9 @@ async fn txn_read() -> Result<()> { let value = "large_value".repeat(10); init().await?; - let client = TransactionClient::new(pd_addrs()).await?; + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; for i in 0..2u32.pow(NUM_BITS_TXN) { let mut cur = i * 2u32.pow(32 - NUM_BITS_TXN); @@ -370,7 +383,9 @@ async fn txn_read() -> Result<()> { #[serial] async fn txn_bank_transfer() -> Result<()> { init().await?; - let client = TransactionClient::new(pd_addrs()).await?; + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; let mut rng = thread_rng(); let options = TransactionOptions::new_optimistic() .use_async_commit() @@ -423,7 +438,8 @@ async fn txn_bank_transfer() -> Result<()> { #[serial] async fn raw_req() -> Result<()> { init().await?; - let client = RawClient::new(pd_addrs()).await?; + let client = + RawClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()).await?; // empty; get non-existent key let res = client.get("k1".to_owned()).await; @@ -553,7 +569,9 @@ async fn raw_req() -> Result<()> { #[serial] async fn txn_update_safepoint() -> Result<()> { init().await?; - let client = TransactionClient::new(pd_addrs()).await?; + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; let res = client.gc(client.current_timestamp().await?).await?; assert!(res); Ok(()) @@ -568,7 +586,8 @@ async fn raw_write_million() -> Result<()> { let interval = 2u32.pow(32 - NUM_BITS_TXN - NUM_BITS_KEY_PER_TXN); init().await?; - let client = RawClient::new(pd_addrs()).await?; + let client = + RawClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()).await?; for i in 0..2u32.pow(NUM_BITS_TXN) { let mut cur = i * 2u32.pow(32 - NUM_BITS_TXN); @@ -702,7 +721,9 @@ async fn raw_write_million() -> Result<()> { #[serial] async fn txn_pessimistic_rollback() -> Result<()> { init().await?; - let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?; + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; let mut preload_txn = client.begin_optimistic().await?; let key1 = vec![1]; let key2 = vec![2]; @@ -734,7 +755,9 @@ async fn txn_pessimistic_rollback() -> Result<()> { #[serial] async fn txn_pessimistic_delete() -> Result<()> { init().await?; - let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?; + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; // The transaction will lock the keys and must release the locks on commit, // even when values are not written to the DB. @@ -785,7 +808,9 @@ async fn txn_pessimistic_delete() -> Result<()> { #[serial] async fn txn_lock_keys() -> Result<()> { init().await?; - let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?; + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; let k1 = b"key1".to_vec(); let k2 = b"key2".to_vec(); @@ -819,7 +844,9 @@ async fn txn_lock_keys() -> Result<()> { #[serial] async fn txn_lock_keys_error_handle() -> Result<()> { init().await?; - let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?; + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; // Keys in `k` should locate in different regions. See `init()` for boundary of regions. let k: Vec = vec![ @@ -856,7 +883,9 @@ async fn txn_lock_keys_error_handle() -> Result<()> { #[serial] async fn txn_get_for_update() -> Result<()> { init().await?; - let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?; + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; let key1 = "key".to_owned(); let key2 = "another key".to_owned(); let value1 = b"some value".to_owned(); @@ -903,7 +932,9 @@ async fn txn_pessimistic_heartbeat() -> Result<()> { let key1 = "key1".to_owned(); let key2 = "key2".to_owned(); - let client = TransactionClient::new(pd_addrs()).await?; + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; let mut heartbeat_txn = client .begin_with_options(TransactionOptions::new_pessimistic()) @@ -943,7 +974,9 @@ async fn txn_pessimistic_heartbeat() -> Result<()> { #[serial] async fn raw_cas() -> Result<()> { init().await?; - let client = RawClient::new(pd_addrs()).await?.with_atomic_for_cas(); + let client = RawClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await? + .with_atomic_for_cas(); let key = "key".to_owned(); let value = "value".to_owned(); let new_value = "new value".to_owned(); @@ -986,7 +1019,8 @@ async fn raw_cas() -> Result<()> { client.batch_delete(vec![key.clone()]).await.err().unwrap(), Error::UnsupportedMode )); - let client = RawClient::new(pd_addrs()).await?; + let client = + RawClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()).await?; assert!(matches!( client .compare_and_swap(key.clone(), None, vec![]) @@ -1003,7 +1037,9 @@ async fn raw_cas() -> Result<()> { #[serial] async fn txn_scan() -> Result<()> { init().await?; - let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?; + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; let k1 = b"a".to_vec(); let v = b"b".to_vec(); @@ -1026,7 +1062,9 @@ async fn txn_scan() -> Result<()> { #[serial] async fn txn_scan_reverse() -> Result<()> { init().await?; - let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?; + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; let k1 = b"k1".to_vec(); let k2 = b"k2".to_vec(); @@ -1099,7 +1137,9 @@ async fn txn_scan_reverse() -> Result<()> { #[serial] async fn txn_scan_reverse_multi_regions() -> Result<()> { init().await?; - let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?; + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; // Keys in `keys` should locate in different regions. See `init()` for boundary of regions. let keys: Vec = vec![ @@ -1143,7 +1183,9 @@ async fn txn_scan_reverse_multi_regions() -> Result<()> { #[serial] async fn txn_key_exists() -> Result<()> { init().await?; - let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?; + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; let key = "key".to_owned(); let value = "value".to_owned(); let mut t1 = client.begin_optimistic().await?; @@ -1166,7 +1208,9 @@ async fn txn_key_exists() -> Result<()> { #[serial] async fn txn_batch_mutate_optimistic() -> Result<()> { init().await?; - let client = TransactionClient::new(pd_addrs()).await?; + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; // Put k0 { @@ -1177,7 +1221,7 @@ async fn txn_batch_mutate_optimistic() -> Result<()> { // Delete k0 and put k1, k2 do_mutate(false).await.unwrap(); // Read and verify - verify_mutate(false).await; + verify_mutate(false).await?; Ok(()) } @@ -1185,7 +1229,9 @@ async fn txn_batch_mutate_optimistic() -> Result<()> { #[serial] async fn txn_batch_mutate_pessimistic() -> Result<()> { init().await?; - let client = TransactionClient::new(pd_addrs()).await?; + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; // Put k0 { @@ -1210,7 +1256,7 @@ async fn txn_batch_mutate_pessimistic() -> Result<()> { txn3_handle.await?.unwrap(); // Read and verify - verify_mutate(true).await; + verify_mutate(true).await?; Ok(()) } @@ -1227,27 +1273,15 @@ async fn begin_mutate(client: &TransactionClient, is_pessimistic: bool) -> Resul } async fn do_mutate(is_pessimistic: bool) -> Result<()> { - let client = TransactionClient::new(pd_addrs()).await.unwrap(); + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; let mut txn = begin_mutate(&client, is_pessimistic).await.unwrap(); let mutations = vec![ - kvrpcpb::Mutation { - op: kvrpcpb::Op::Del.into(), - key: b"k0".to_vec(), - ..Default::default() - }, - kvrpcpb::Mutation { - op: kvrpcpb::Op::Put.into(), - key: b"k1".to_vec(), - value: b"v1".to_vec(), - ..Default::default() - }, - kvrpcpb::Mutation { - op: kvrpcpb::Op::Put.into(), - key: b"k2".to_vec(), - value: b"v2".to_vec(), - ..Default::default() - }, + Mutation::Delete(Key::from("k0".to_owned())), + Mutation::Put(Key::from("k1".to_owned()), Value::from("v1".to_owned())), + Mutation::Put(Key::from("k2".to_owned()), Value::from("v2".to_owned())), ]; match txn.batch_mutate(mutations).await { @@ -1262,8 +1296,10 @@ async fn do_mutate(is_pessimistic: bool) -> Result<()> { } } -async fn verify_mutate(is_pessimistic: bool) { - let client = TransactionClient::new(pd_addrs()).await.unwrap(); +async fn verify_mutate(is_pessimistic: bool) -> Result<()> { + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; let mut snapshot = snapshot(&client, is_pessimistic).await.unwrap(); let res: HashMap = snapshot .batch_get(vec!["k0".to_owned(), "k1".to_owned(), "k2".to_owned()]) @@ -1280,13 +1316,16 @@ async fn verify_mutate(is_pessimistic: bool) { res.get(&Key::from("k2".to_owned())), Some(Value::from("v2".to_owned())).as_ref() ); + Ok(()) } #[tokio::test] #[serial] async fn txn_unsafe_destroy_range() -> Result<()> { init().await?; - let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?; + let client = + TransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) + .await?; const DATA_COUNT: usize = 10;