From 42f819a4bd0a516da8b8a9efeac38f04216853fa Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Tue, 15 Aug 2023 21:02:44 +0800 Subject: [PATCH 1/7] API v2 part1 Signed-off-by: Ping Yu --- src/mock.rs | 21 ++++++++++-- src/pd/client.rs | 38 ++++++++++++++++++---- src/raw/client.rs | 17 +++++++--- src/raw/requests.rs | 5 +++ src/request/codec.rs | 37 +++++++++++++++++++++ src/request/mod.rs | 6 ++++ src/request/plan.rs | 12 +++++-- src/request/plan_builder.rs | 19 +++++++---- src/request/shard.rs | 5 +-- src/store/request.rs | 6 ++++ src/transaction/client.rs | 59 +++++++++++++++++++++++++++------- src/transaction/snapshot.rs | 8 +++-- src/transaction/transaction.rs | 3 +- 13 files changed, 198 insertions(+), 38 deletions(-) create mode 100644 src/request/codec.rs diff --git a/src/mock.rs b/src/mock.rs index eada6a8e..887bb2d2 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -18,6 +18,7 @@ use crate::proto::metapb::RegionEpoch; use crate::proto::metapb::{self}; use crate::region::RegionId; use crate::region::RegionWithLeader; +use crate::request::codec::ApiV1Codec; use crate::store::KvClient; use crate::store::KvConnect; use crate::store::RegionStore; @@ -30,7 +31,7 @@ use crate::Timestamp; /// Create a `PdRpcClient` with it's internals replaced with mocks so that the /// client can be tested without doing any RPC calls. -pub async fn pd_rpc_client() -> PdRpcClient { +pub async fn pd_rpc_client() -> PdRpcClient { let config = Config::default(); PdRpcClient::new( config.clone(), @@ -43,6 +44,7 @@ pub async fn pd_rpc_client() -> PdRpcClient { )) }, false, + Some(ApiV1Codec::default()), ) .await .unwrap() @@ -71,9 +73,18 @@ pub struct MockKvConnect; pub struct MockCluster; -#[derive(new)] pub struct MockPdClient { client: MockKvClient, + codec: ApiV1Codec, +} + +impl MockPdClient { + pub fn new(client: MockKvClient) -> MockPdClient { + MockPdClient { + client, + codec: ApiV1Codec::default(), + } + } } #[async_trait] @@ -102,6 +113,7 @@ impl MockPdClient { pub fn default() -> MockPdClient { MockPdClient { client: MockKvClient::default(), + codec: ApiV1Codec::default(), } } @@ -165,6 +177,7 @@ impl MockPdClient { #[async_trait] impl PdClient for MockPdClient { + type Codec = ApiV1Codec; type KvClient = MockKvClient; async fn map_region_to_store(self: Arc, region: RegionWithLeader) -> Result { @@ -210,4 +223,8 @@ impl PdClient for MockPdClient { } async fn invalidate_region_cache(&self, _ver_id: crate::region::RegionVerId) {} + + fn get_codec(&self) -> &Self::Codec { + &self.codec + } } diff --git a/src/pd/client.rs b/src/pd/client.rs index 31f88968..e22b7c90 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -20,6 +20,7 @@ use crate::region::RegionId; use crate::region::RegionVerId; use crate::region::RegionWithLeader; use crate::region_cache::RegionCache; +use crate::request::codec::{ApiV1Codec, Codec}; use crate::store::KvClient; use crate::store::KvConnect; use crate::store::RegionStore; @@ -50,6 +51,7 @@ use crate::Timestamp; /// So if we use transactional APIs, keys in PD are encoded and PD does not know about the encoding stuff. #[async_trait] pub trait PdClient: Send + Sync + 'static { + type Codec: Codec; type KvClient: KvClient + Send + Sync + 'static; /// In transactional API, `region` is decoded (keys in raw format). @@ -200,20 +202,30 @@ pub trait PdClient: Send + Sync + 'static { async fn update_leader(&self, ver_id: RegionVerId, leader: metapb::Peer) -> Result<()>; async fn invalidate_region_cache(&self, ver_id: RegionVerId); + + /// Get the codec carried by `PdClient`. + /// The purpose of carrying the codec is to reduce the passing of it on so many calling paths. + fn get_codec(&self) -> &Self::Codec; } /// This client converts requests for the logical TiKV cluster into requests /// for a single TiKV store using PD and internal logic. -pub struct PdRpcClient { +pub struct PdRpcClient< + Cod: Codec = ApiV1Codec, + KvC: KvConnect + Send + Sync + 'static = TikvConnect, + Cl = Cluster, +> { pd: Arc>, kv_connect: KvC, kv_client_cache: Arc>>, enable_codec: bool, region_cache: RegionCache>, + codec: Option, } #[async_trait] -impl PdClient for PdRpcClient { +impl PdClient for PdRpcClient { + type Codec = Cod; type KvClient = KvC::KvClient; async fn map_region_to_store(self: Arc, region: RegionWithLeader) -> Result { @@ -255,31 +267,40 @@ impl PdClient for PdRpcClient { async fn invalidate_region_cache(&self, ver_id: RegionVerId) { self.region_cache.invalidate_region_cache(ver_id).await } + + fn get_codec(&self) -> &Self::Codec { + self.codec + .as_ref() + .unwrap_or_else(|| panic!("codec not set")) + } } -impl PdRpcClient { +impl PdRpcClient { pub async fn connect( pd_endpoints: &[String], config: Config, enable_codec: bool, - ) -> Result { + codec: Option, + ) -> Result> { PdRpcClient::new( config.clone(), |security_mgr| TikvConnect::new(security_mgr, config.timeout), |security_mgr| RetryClient::connect(pd_endpoints, security_mgr, config.timeout), enable_codec, + codec, ) .await } } -impl PdRpcClient { +impl PdRpcClient { pub async fn new( config: Config, kv_connect: MakeKvC, pd: MakePd, enable_codec: bool, - ) -> Result> + codec: Option, + ) -> Result> where PdFut: Future>>, MakeKvC: FnOnce(Arc) -> KvC, @@ -303,6 +324,7 @@ impl PdRpcClient { kv_connect: kv_connect(security_mgr), enable_codec, region_cache: RegionCache::new(pd), + codec, }) } @@ -322,6 +344,10 @@ impl PdRpcClient { Err(e) => Err(e), } } + + pub fn set_codec(&mut self, codec: Cod) { + self.codec = Some(codec); + } } fn make_key_range(start_key: Vec, end_key: Vec) -> kvrpcpb::KeyRange { diff --git a/src/raw/client.rs b/src/raw/client.rs index 0bdc2f8b..b69662bc 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -15,6 +15,7 @@ use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::metapb; use crate::raw::lowering::*; +use crate::request::codec::{ApiV1Codec, Codec}; use crate::request::Collect; use crate::request::CollectSingle; use crate::request::Plan; @@ -35,7 +36,11 @@ const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240; /// /// The returned results of raw request methods are [`Future`](std::future::Future)s that must be /// awaited to execute. -pub struct Client { +pub struct Client> +where + Cod: Codec, + PdC: PdClient, +{ rpc: Arc, cf: Option, backoff: Backoff, @@ -54,7 +59,7 @@ impl Clone for Client { } } -impl Client { +impl Client { /// Create a raw [`Client`] and connect to the TiKV cluster. /// /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for @@ -100,7 +105,9 @@ impl Client { config: Config, ) -> Result { let pd_endpoints: Vec = pd_endpoints.into_iter().map(Into::into).collect(); - let rpc = Arc::new(PdRpcClient::connect(&pd_endpoints, config, false).await?); + let rpc = Arc::new( + PdRpcClient::connect(&pd_endpoints, config, false, Some(ApiV1Codec::default())).await?, + ); Ok(Client { rpc, cf: None, @@ -142,7 +149,9 @@ impl Client { atomic: self.atomic, } } +} +impl Client> { /// Set the [`Backoff`] strategy for retrying requests. /// The default strategy is [`DEFAULT_REGION_BACKOFF`](crate::backoff::DEFAULT_REGION_BACKOFF). /// See [`Backoff`] for more information. @@ -189,7 +198,7 @@ impl Client { } } -impl Client { +impl> Client { /// Create a new 'get' request. /// /// Once resolved this request will result in the fetching of the value associated with the diff --git a/src/raw/requests.rs b/src/raw/requests.rs index 107b8807..e0e64c5e 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -13,6 +13,7 @@ use super::RawRpcRequest; use crate::collect_first; use crate::pd::PdClient; use crate::proto::kvrpcpb; +use crate::proto::kvrpcpb::ApiVersion; use crate::proto::metapb; use crate::proto::tikvpb::tikv_client::TikvClient; use crate::request::plan::ResponseWithShard; @@ -397,6 +398,10 @@ impl Request for RawCoprocessorRequest { fn set_context(&mut self, context: kvrpcpb::Context) { self.inner.set_context(context); } + + fn set_api_version(&mut self, api_version: ApiVersion) { + self.inner.set_api_version(api_version); + } } impl KvRequest for RawCoprocessorRequest { diff --git a/src/request/codec.rs b/src/request/codec.rs new file mode 100644 index 00000000..be5716ca --- /dev/null +++ b/src/request/codec.rs @@ -0,0 +1,37 @@ +// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. + +use crate::proto::kvrpcpb; +use crate::request::KvRequest; +use std::borrow::Cow; + +pub trait Codec: Clone + Sync + Send + 'static { + fn encode_request<'a, R: KvRequest>(&self, req: &'a R) -> Cow<'a, R> { + Cow::Borrowed(req) + } +} + +#[derive(Clone, Default)] +pub struct ApiV1Codec {} + +impl Codec for ApiV1Codec {} + +#[derive(Clone)] +pub struct ApiV2Codec { + _keyspace_id: u32, +} + +impl ApiV2Codec { + pub fn new(keyspace_id: u32) -> Self { + Self { + _keyspace_id: keyspace_id, + } + } +} + +impl Codec for ApiV2Codec { + fn encode_request<'a, R: KvRequest>(&self, req: &'a R) -> Cow<'a, R> { + let mut req = req.clone(); + req.set_api_version(kvrpcpb::ApiVersion::V2); + Cow::Owned(req) + } +} diff --git a/src/request/mod.rs b/src/request/mod.rs index a1ac759e..22839443 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -32,6 +32,7 @@ use crate::store::HasKeyErrors; use crate::store::Request; use crate::transaction::HasLocks; +pub mod codec; pub mod plan; mod plan_builder; mod shard; @@ -88,6 +89,7 @@ mod test { use crate::mock::MockKvClient; use crate::mock::MockPdClient; use crate::proto::kvrpcpb; + use crate::proto::kvrpcpb::ApiVersion; use crate::proto::pdpb::Timestamp; use crate::proto::tikvpb::tikv_client::TikvClient; use crate::store::store_stream_for_keys; @@ -138,6 +140,10 @@ mod test { fn set_context(&mut self, _: kvrpcpb::Context) { unreachable!(); } + + fn set_api_version(&mut self, _api_version: ApiVersion) { + unreachable!(); + } } #[async_trait] diff --git a/src/request/plan.rs b/src/request/plan.rs index 905e7fad..2f10d5c7 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -17,6 +17,7 @@ use crate::pd::PdClient; use crate::proto::errorpb; use crate::proto::errorpb::EpochNotMatch; use crate::proto::kvrpcpb; +use crate::request::codec::Codec; use crate::request::shard::HasNextBatch; use crate::request::KvRequest; use crate::request::NextBatch; @@ -48,22 +49,27 @@ pub trait Plan: Sized + Clone + Sync + Send + 'static { /// The simplest plan which just dispatches a request to a specific kv server. #[derive(Clone)] -pub struct Dispatch { +pub struct Dispatch { pub request: Req, pub kv_client: Option>, + pub codec: Cod, } #[async_trait] -impl Plan for Dispatch { +impl Plan for Dispatch { type Result = Req::Response; async fn execute(&self) -> Result { + // `encode_request` will clone the request, which would have high overhead. + // TODO: consider in-place encoding. + let req = self.codec.encode_request(&self.request); + let stats = tikv_stats(self.request.label()); let result = self .kv_client .as_ref() .expect("Unreachable: kv_client has not been initialised in Dispatch") - .dispatch(&self.request) + .dispatch(req.as_ref()) .await; let result = stats.done(result); result.map(|r| { diff --git a/src/request/plan_builder.rs b/src/request/plan_builder.rs index 5ce2350c..678c9686 100644 --- a/src/request/plan_builder.rs +++ b/src/request/plan_builder.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use super::plan::PreserveShard; use crate::backoff::Backoff; use crate::pd::PdClient; +use crate::request::codec::Codec; use crate::request::plan::CleanupLocks; use crate::request::shard::HasNextBatch; use crate::request::DefaultProcessor; @@ -45,13 +46,17 @@ impl PlanBuilderPhase for NoTarget {} pub struct Targetted; impl PlanBuilderPhase for Targetted {} -impl PlanBuilder, NoTarget> { +impl, Req: KvRequest> + PlanBuilder, NoTarget> +{ pub fn new(pd_client: Arc, request: Req) -> Self { + let codec = pd_client.get_codec().clone(); PlanBuilder { pd_client, plan: Dispatch { request, kv_client: None, + codec, }, phantom: PhantomData, } @@ -183,12 +188,14 @@ where } } -impl PlanBuilder, NoTarget> { +impl, R: KvRequest> + PlanBuilder, NoTarget> +{ /// Target the request at a single region; caller supplies the store to target. pub async fn single_region_with_store( self, store: RegionStore, - ) -> Result, Targetted>> { + ) -> Result, Targetted>> { set_single_region_store(self.plan, store, self.pd_client) } } @@ -222,11 +229,11 @@ where } } -fn set_single_region_store( - mut plan: Dispatch, +fn set_single_region_store, R: KvRequest>( + mut plan: Dispatch, store: RegionStore, pd_client: Arc, -) -> Result, Targetted>> { +) -> Result, Targetted>> { plan.request .set_context(store.region_with_leader.context()?); plan.kv_client = Some(store.client); diff --git a/src/request/shard.rs b/src/request/shard.rs index aaefab72..413ed9ed 100644 --- a/src/request/shard.rs +++ b/src/request/shard.rs @@ -6,6 +6,7 @@ use futures::stream::BoxStream; use super::plan::PreserveShard; use crate::pd::PdClient; +use crate::request::codec::Codec; use crate::request::plan::CleanupLocks; use crate::request::Dispatch; use crate::request::KvRequest; @@ -80,7 +81,7 @@ pub trait NextBatch { fn next_batch(&mut self, _range: (Vec, Vec)); } -impl Shardable for Dispatch { +impl Shardable for Dispatch { type Shard = Req::Shard; fn shards( @@ -96,7 +97,7 @@ impl Shardable for Dispatch { } } -impl NextBatch for Dispatch { +impl NextBatch for Dispatch { fn next_batch(&mut self, range: (Vec, Vec)) { self.request.next_batch(range); } diff --git a/src/store/request.rs b/src/store/request.rs index 5060eb01..a26e57e0 100644 --- a/src/store/request.rs +++ b/src/store/request.rs @@ -22,6 +22,7 @@ pub trait Request: Any + Sync + Send + 'static { fn label(&self) -> &'static str; fn as_any(&self) -> &dyn Any; fn set_context(&mut self, context: kvrpcpb::Context); + fn set_api_version(&mut self, api_version: kvrpcpb::ApiVersion); } macro_rules! impl_request { @@ -54,6 +55,11 @@ macro_rules! impl_request { fn set_context(&mut self, context: kvrpcpb::Context) { self.context = Some(context); } + + fn set_api_version(&mut self, api_version: kvrpcpb::ApiVersion) { + let context = self.context.get_or_insert(kvrpcpb::Context::default()); + context.api_version = api_version.into(); + } } }; } diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 64d32451..4212079c 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -10,6 +10,7 @@ use crate::config::Config; use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::pdpb::Timestamp; +use crate::request::codec::{ApiV1Codec, ApiV2Codec, Codec}; use crate::request::plan::CleanupLocksResult; use crate::request::Plan; use crate::timestamp::TimestampExt; @@ -42,11 +43,11 @@ const SCAN_LOCK_BATCH_SIZE: u32 = 1024; /// /// The returned results of transactional requests are [`Future`](std::future::Future)s that must be /// awaited to execute. -pub struct Client { - pd: Arc, +pub struct Client { + pd: Arc>, } -impl Clone for Client { +impl Clone for Client { fn clone(&self) -> Self { Self { pd: self.pd.clone(), @@ -54,7 +55,7 @@ impl Clone for Client { } } -impl Client { +impl Client { /// Create a transactional [`Client`] and connect to the TiKV cluster. /// /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for @@ -71,7 +72,6 @@ impl Client { /// # }); /// ``` pub async fn new>(pd_endpoints: Vec) -> Result { - // debug!("creating transactional client"); Self::new_with_config(pd_endpoints, Config::default()).await } @@ -100,9 +100,35 @@ impl Client { pd_endpoints: Vec, config: Config, ) -> Result { + Self::new_with_codec(pd_endpoints, config, ApiV1Codec::default()).await + } +} + +impl Client { + pub async fn new_with_config_v2>( + _keyspace_name: &str, + pd_endpoints: Vec, + config: Config, + ) -> Result> { + debug!("creating new transactional client APIv2"); + let pd_endpoints: Vec = pd_endpoints.into_iter().map(Into::into).collect(); + let mut pd = PdRpcClient::connect(&pd_endpoints, config, true, None).await?; + let keyspace_id = 0; // TODO: get keyspace_id by pd.get_keyspace(keyspace_name) + pd.set_codec(ApiV2Codec::new(keyspace_id)); + Ok(Client { pd: Arc::new(pd) }) + } +} + +impl Client { + pub async fn new_with_codec>( + pd_endpoints: Vec, + config: Config, + codec: Cod, + ) -> Result> { debug!("creating new transactional client"); let pd_endpoints: Vec = pd_endpoints.into_iter().map(Into::into).collect(); - let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config, true).await?); + let pd = + Arc::new(PdRpcClient::::connect(&pd_endpoints, config, true, Some(codec)).await?); Ok(Client { pd }) } @@ -126,7 +152,7 @@ impl Client { /// transaction.commit().await.unwrap(); /// # }); /// ``` - pub async fn begin_optimistic(&self) -> Result { + pub async fn begin_optimistic(&self) -> Result>> { debug!("creating new optimistic transaction"); let timestamp = self.current_timestamp().await?; Ok(self.new_transaction(timestamp, TransactionOptions::new_optimistic())) @@ -149,7 +175,7 @@ impl Client { /// transaction.commit().await.unwrap(); /// # }); /// ``` - pub async fn begin_pessimistic(&self) -> Result { + pub async fn begin_pessimistic(&self) -> Result>> { debug!("creating new pessimistic transaction"); let timestamp = self.current_timestamp().await?; Ok(self.new_transaction(timestamp, TransactionOptions::new_pessimistic())) @@ -172,14 +198,21 @@ impl Client { /// transaction.commit().await.unwrap(); /// # }); /// ``` - pub async fn begin_with_options(&self, options: TransactionOptions) -> Result { + pub async fn begin_with_options( + &self, + options: TransactionOptions, + ) -> Result>> { debug!("creating new customized transaction"); let timestamp = self.current_timestamp().await?; Ok(self.new_transaction(timestamp, options)) } /// Create a new [`Snapshot`](Snapshot) at the given [`Timestamp`](Timestamp). - pub fn snapshot(&self, timestamp: Timestamp, options: TransactionOptions) -> Snapshot { + pub fn snapshot( + &self, + timestamp: Timestamp, + options: TransactionOptions, + ) -> Snapshot> { debug!("creating new snapshot"); Snapshot::new(self.new_transaction(timestamp, options.read_only())) } @@ -272,7 +305,11 @@ impl Client { plan.execute().await } - fn new_transaction(&self, timestamp: Timestamp, options: TransactionOptions) -> Transaction { + fn new_transaction( + &self, + timestamp: Timestamp, + options: TransactionOptions, + ) -> Transaction> { Transaction::new(timestamp, self.pd.clone(), options) } } diff --git a/src/transaction/snapshot.rs b/src/transaction/snapshot.rs index 5694614b..2ae15769 100644 --- a/src/transaction/snapshot.rs +++ b/src/transaction/snapshot.rs @@ -3,6 +3,8 @@ use derive_new::new; use log::debug; +use crate::pd::{PdClient, PdRpcClient}; +use crate::request::codec::Codec; use crate::BoundRange; use crate::Key; use crate::KvPair; @@ -18,11 +20,11 @@ use crate::Value; /// /// See the [Transaction](struct@crate::Transaction) docs for more information on the methods. #[derive(new)] -pub struct Snapshot { - transaction: Transaction, +pub struct Snapshot { + transaction: Transaction, } -impl Snapshot { +impl> Snapshot { /// Get the value associated with the given key. pub async fn get(&mut self, key: impl Into) -> Result> { debug!("invoking get request on snapshot"); diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 44532f7e..f4b49a3e 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -18,6 +18,7 @@ use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::kvrpcpb; use crate::proto::pdpb::Timestamp; +use crate::request::codec::Codec; use crate::request::Collect; use crate::request::CollectError; use crate::request::CollectSingle; @@ -83,7 +84,7 @@ pub struct Transaction { start_instant: Instant, } -impl Transaction { +impl> Transaction { pub(crate) fn new( timestamp: Timestamp, rpc: Arc, From 40cdb54082c17625d2f534c80d065de050a1b717 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Fri, 25 Aug 2023 20:28:43 +0800 Subject: [PATCH 2/7] inplace encoding Signed-off-by: Ping Yu --- src/lib.rs | 2 ++ src/raw/client.rs | 35 +++++++++++++++++++++----------- src/raw/requests.rs | 4 +++- src/request/codec.rs | 24 +++++++++++++++------- src/request/mod.rs | 13 +++++++++--- src/request/plan.rs | 12 +++-------- src/request/plan_builder.rs | 24 +++++++++------------- src/request/shard.rs | 5 ++--- src/transaction/client.rs | 8 +++++--- src/transaction/lock.rs | 16 ++++++++++----- src/transaction/transaction.rs | 37 ++++++++++++++++++++++------------ 11 files changed, 109 insertions(+), 71 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 60dc2956..b9ba66c0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -145,6 +145,8 @@ pub use crate::raw::Client as RawClient; #[doc(inline)] pub use crate::raw::ColumnFamily; #[doc(inline)] +pub use crate::request::codec; +#[doc(inline)] pub use crate::request::RetryOptions; #[doc(inline)] pub use crate::timestamp::Timestamp; diff --git a/src/raw/client.rs b/src/raw/client.rs index b69662bc..5387ac04 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -15,7 +15,7 @@ use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::metapb; use crate::raw::lowering::*; -use crate::request::codec::{ApiV1Codec, Codec}; +use crate::request::codec::{ApiV1Codec, Codec, EncodedRequest}; use crate::request::Collect; use crate::request::CollectSingle; use crate::request::Plan; @@ -220,7 +220,8 @@ impl> Client { pub async fn get(&self, key: impl Into) -> Result> { debug!("invoking raw get request"); let request = new_raw_get_request(key.into(), self.cf.clone()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) + let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) .retry_multi_region(self.backoff.clone()) .merge(CollectSingle) .post_process_default() @@ -252,7 +253,8 @@ impl> Client { ) -> Result> { debug!("invoking raw batch_get request"); let request = new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) + let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) .retry_multi_region(self.backoff.clone()) .merge(Collect) .plan(); @@ -280,7 +282,8 @@ impl> Client { pub async fn put(&self, key: impl Into, value: impl Into) -> Result<()> { debug!("invoking raw put request"); let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) + let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) .retry_multi_region(self.backoff.clone()) .merge(CollectSingle) .extract_error() @@ -316,7 +319,8 @@ impl> Client { self.cf.clone(), self.atomic, ); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) + let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) .retry_multi_region(self.backoff.clone()) .extract_error() .plan(); @@ -344,7 +348,8 @@ impl> Client { pub async fn delete(&self, key: impl Into) -> Result<()> { debug!("invoking raw delete request"); let request = new_raw_delete_request(key.into(), self.cf.clone(), self.atomic); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) + let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) .retry_multi_region(self.backoff.clone()) .merge(CollectSingle) .extract_error() @@ -375,7 +380,8 @@ impl> Client { self.assert_non_atomic()?; let request = new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) + let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) .retry_multi_region(self.backoff.clone()) .extract_error() .plan(); @@ -402,7 +408,8 @@ impl> Client { debug!("invoking raw delete_range request"); self.assert_non_atomic()?; let request = new_raw_delete_range_request(range.into(), self.cf.clone()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) + let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) .retry_multi_region(self.backoff.clone()) .extract_error() .plan(); @@ -558,7 +565,8 @@ impl> Client { previous_value.into(), self.cf.clone(), ); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req) + let encoded_req = EncodedRequest::new(req, self.rpc.get_codec()); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) .retry_multi_region(self.backoff.clone()) .merge(CollectSingle) .post_process_default() @@ -581,7 +589,8 @@ impl> Client { ranges.into_iter().map(Into::into), request_builder, ); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req) + let encoded_req = EncodedRequest::new(req, self.rpc.get_codec()); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) .preserve_shard() .retry_multi_region(self.backoff.clone()) .post_process_default() @@ -615,7 +624,8 @@ impl> Client { while cur_limit > 0 { let request = new_raw_scan_request(cur_range.clone(), cur_limit, key_only, self.cf.clone()); - let resp = crate::request::PlanBuilder::new(self.rpc.clone(), request) + let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); + let resp = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) .single_region_with_store(region_store.clone()) .await? .plan() @@ -670,7 +680,8 @@ impl> Client { key_only, self.cf.clone(), ); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) + let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) .retry_multi_region(self.backoff.clone()) .merge(Collect) .plan(); diff --git a/src/raw/requests.rs b/src/raw/requests.rs index e0e64c5e..ff1cc14f 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -501,6 +501,7 @@ mod test { use crate::mock::MockKvClient; use crate::mock::MockPdClient; use crate::proto::kvrpcpb; + use crate::request::codec::EncodedRequest; use crate::request::Plan; use crate::Key; @@ -535,7 +536,8 @@ mod test { key_only: true, ..Default::default() }; - let plan = crate::request::PlanBuilder::new(client, scan) + let encoded_scan = EncodedRequest::new(scan, client.get_codec()); + let plan = crate::request::PlanBuilder::new(client, encoded_scan) .resolve_lock(OPTIMISTIC_BACKOFF) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(Collect) diff --git a/src/request/codec.rs b/src/request/codec.rs index be5716ca..06e447c6 100644 --- a/src/request/codec.rs +++ b/src/request/codec.rs @@ -2,12 +2,10 @@ use crate::proto::kvrpcpb; use crate::request::KvRequest; -use std::borrow::Cow; pub trait Codec: Clone + Sync + Send + 'static { - fn encode_request<'a, R: KvRequest>(&self, req: &'a R) -> Cow<'a, R> { - Cow::Borrowed(req) - } + fn encode_request(&self, _req: &mut R) {} + // TODO: fn decode_response() } #[derive(Clone, Default)] @@ -29,9 +27,21 @@ impl ApiV2Codec { } impl Codec for ApiV2Codec { - fn encode_request<'a, R: KvRequest>(&self, req: &'a R) -> Cow<'a, R> { - let mut req = req.clone(); + fn encode_request(&self, req: &mut R) { req.set_api_version(kvrpcpb::ApiVersion::V2); - Cow::Owned(req) + // TODO: req.encode_request(self); + } +} + +// EncodeRequest is just a type wrapper to avoid passing not encoded request to `PlanBuilder` by mistake. +#[derive(Clone)] +pub struct EncodedRequest { + pub inner: Req, +} + +impl EncodedRequest { + pub fn new(mut req: Req, codec: &C) -> Self { + codec.encode_request(&mut req); + Self { inner: req } } } diff --git a/src/request/mod.rs b/src/request/mod.rs index 22839443..55746c5c 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -42,6 +42,9 @@ mod shard; pub trait KvRequest: Request + Sized + Clone + Sync + Send + 'static { /// The expected response to the request. type Response: HasKeyErrors + HasLocks + Clone + Send + 'static; + + // TODO: fn encode_request() + // TODO: fn decode_response() } #[derive(Clone, Debug, new, Eq, PartialEq)] @@ -88,10 +91,12 @@ mod test { use super::*; use crate::mock::MockKvClient; use crate::mock::MockPdClient; + use crate::pd::PdClient; use crate::proto::kvrpcpb; use crate::proto::kvrpcpb::ApiVersion; use crate::proto::pdpb::Timestamp; use crate::proto::tikvpb::tikv_client::TikvClient; + use crate::request::codec::EncodedRequest; use crate::store::store_stream_for_keys; use crate::store::HasRegionError; use crate::transaction::lowering::new_commit_request; @@ -189,7 +194,8 @@ mod test { |_: &dyn Any| Ok(Box::new(MockRpcResponse) as Box), ))); - let plan = crate::request::PlanBuilder::new(pd_client.clone(), request) + let encoded_req = EncodedRequest::new(request, pd_client.get_codec()); + let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) .resolve_lock(Backoff::no_jitter_backoff(1, 1, 3)) .retry_multi_region(Backoff::no_jitter_backoff(1, 1, 3)) .extract_error() @@ -213,16 +219,17 @@ mod test { let key: Key = "key".to_owned().into(); let req = new_commit_request(iter::once(key), Timestamp::default(), Timestamp::default()); + let encoded_req = EncodedRequest::new(req, pd_client.get_codec()); // does not extract error - let plan = crate::request::PlanBuilder::new(pd_client.clone(), req.clone()) + let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req.clone()) .resolve_lock(OPTIMISTIC_BACKOFF) .retry_multi_region(OPTIMISTIC_BACKOFF) .plan(); assert!(plan.execute().await.is_ok()); // extract error - let plan = crate::request::PlanBuilder::new(pd_client.clone(), req) + let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) .resolve_lock(OPTIMISTIC_BACKOFF) .retry_multi_region(OPTIMISTIC_BACKOFF) .extract_error() diff --git a/src/request/plan.rs b/src/request/plan.rs index 2f10d5c7..905e7fad 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -17,7 +17,6 @@ use crate::pd::PdClient; use crate::proto::errorpb; use crate::proto::errorpb::EpochNotMatch; use crate::proto::kvrpcpb; -use crate::request::codec::Codec; use crate::request::shard::HasNextBatch; use crate::request::KvRequest; use crate::request::NextBatch; @@ -49,27 +48,22 @@ pub trait Plan: Sized + Clone + Sync + Send + 'static { /// The simplest plan which just dispatches a request to a specific kv server. #[derive(Clone)] -pub struct Dispatch { +pub struct Dispatch { pub request: Req, pub kv_client: Option>, - pub codec: Cod, } #[async_trait] -impl Plan for Dispatch { +impl Plan for Dispatch { type Result = Req::Response; async fn execute(&self) -> Result { - // `encode_request` will clone the request, which would have high overhead. - // TODO: consider in-place encoding. - let req = self.codec.encode_request(&self.request); - let stats = tikv_stats(self.request.label()); let result = self .kv_client .as_ref() .expect("Unreachable: kv_client has not been initialised in Dispatch") - .dispatch(req.as_ref()) + .dispatch(&self.request) .await; let result = stats.done(result); result.map(|r| { diff --git a/src/request/plan_builder.rs b/src/request/plan_builder.rs index 678c9686..88a5e9ad 100644 --- a/src/request/plan_builder.rs +++ b/src/request/plan_builder.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use super::plan::PreserveShard; use crate::backoff::Backoff; use crate::pd::PdClient; -use crate::request::codec::Codec; +use crate::request::codec::EncodedRequest; use crate::request::plan::CleanupLocks; use crate::request::shard::HasNextBatch; use crate::request::DefaultProcessor; @@ -46,17 +46,13 @@ impl PlanBuilderPhase for NoTarget {} pub struct Targetted; impl PlanBuilderPhase for Targetted {} -impl, Req: KvRequest> - PlanBuilder, NoTarget> -{ - pub fn new(pd_client: Arc, request: Req) -> Self { - let codec = pd_client.get_codec().clone(); +impl PlanBuilder, NoTarget> { + pub fn new(pd_client: Arc, encoded_request: EncodedRequest) -> Self { PlanBuilder { pd_client, plan: Dispatch { - request, + request: encoded_request.inner, kv_client: None, - codec, }, phantom: PhantomData, } @@ -188,14 +184,12 @@ where } } -impl, R: KvRequest> - PlanBuilder, NoTarget> -{ +impl PlanBuilder, NoTarget> { /// Target the request at a single region; caller supplies the store to target. pub async fn single_region_with_store( self, store: RegionStore, - ) -> Result, Targetted>> { + ) -> Result, Targetted>> { set_single_region_store(self.plan, store, self.pd_client) } } @@ -229,11 +223,11 @@ where } } -fn set_single_region_store, R: KvRequest>( - mut plan: Dispatch, +fn set_single_region_store( + mut plan: Dispatch, store: RegionStore, pd_client: Arc, -) -> Result, Targetted>> { +) -> Result, Targetted>> { plan.request .set_context(store.region_with_leader.context()?); plan.kv_client = Some(store.client); diff --git a/src/request/shard.rs b/src/request/shard.rs index 413ed9ed..aaefab72 100644 --- a/src/request/shard.rs +++ b/src/request/shard.rs @@ -6,7 +6,6 @@ use futures::stream::BoxStream; use super::plan::PreserveShard; use crate::pd::PdClient; -use crate::request::codec::Codec; use crate::request::plan::CleanupLocks; use crate::request::Dispatch; use crate::request::KvRequest; @@ -81,7 +80,7 @@ pub trait NextBatch { fn next_batch(&mut self, _range: (Vec, Vec)); } -impl Shardable for Dispatch { +impl Shardable for Dispatch { type Shard = Req::Shard; fn shards( @@ -97,7 +96,7 @@ impl Shardable for Dispatch { } } -impl NextBatch for Dispatch { +impl NextBatch for Dispatch { fn next_batch(&mut self, range: (Vec, Vec)) { self.request.next_batch(range); } diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 4212079c..8d1cf43d 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -10,7 +10,7 @@ use crate::config::Config; use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::pdpb::Timestamp; -use crate::request::codec::{ApiV1Codec, ApiV2Codec, Codec}; +use crate::request::codec::{ApiV1Codec, ApiV2Codec, Codec, EncodedRequest}; use crate::request::plan::CleanupLocksResult; use crate::request::Plan; use crate::timestamp::TimestampExt; @@ -279,7 +279,8 @@ impl Client { let ctx = ResolveLocksContext::default(); let backoff = Backoff::equal_jitter_backoff(100, 10000, 50); let req = new_scan_lock_request(range.into(), safepoint, options.batch_size); - let plan = crate::request::PlanBuilder::new(self.pd.clone(), req) + let encoded_req = EncodedRequest::new(req, self.pd.get_codec()); + let plan = crate::request::PlanBuilder::new(self.pd.clone(), encoded_req) .cleanup_locks(ctx.clone(), options, backoff) .retry_multi_region(DEFAULT_REGION_BACKOFF) .extract_error() @@ -298,7 +299,8 @@ impl Client { batch_size: u32, ) -> Result> { let req = new_scan_lock_request(range.into(), safepoint, batch_size); - let plan = crate::request::PlanBuilder::new(self.pd.clone(), req) + let encoded_req = EncodedRequest::new(req, self.pd.get_codec()); + let plan = crate::request::PlanBuilder::new(self.pd.clone(), encoded_req) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(crate::request::Collect) .plan(); diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index 25522422..c55ab381 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -17,6 +17,7 @@ use crate::proto::kvrpcpb; use crate::proto::kvrpcpb::TxnInfo; use crate::proto::pdpb::Timestamp; use crate::region::RegionVerId; +use crate::request::codec::EncodedRequest; use crate::request::Collect; use crate::request::CollectSingle; use crate::request::Plan; @@ -77,7 +78,8 @@ pub async fn resolve_locks( Some(&commit_version) => commit_version, None => { let request = requests::new_cleanup_request(lock.primary_lock, lock.lock_version); - let plan = crate::request::PlanBuilder::new(pd_client.clone(), request) + let encoded_req = EncodedRequest::new(request, pd_client.get_codec()); + let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) .resolve_lock(OPTIMISTIC_BACKOFF) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(CollectSingle) @@ -118,8 +120,9 @@ async fn resolve_lock_with_retry( let store = pd_client.clone().store_for_key(key.into()).await?; let ver_id = store.region_with_leader.ver_id(); let request = requests::new_resolve_lock_request(start_version, commit_version); + let encoded_req = EncodedRequest::new(request, pd_client.get_codec()); // The only place where single-region is used - let plan = crate::request::PlanBuilder::new(pd_client.clone(), request) + let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) .single_region_with_store(store) .await? .resolve_lock(Backoff::no_backoff()) @@ -359,7 +362,8 @@ impl LockResolver { force_sync_commit, resolving_pessimistic_lock, ); - let plan = crate::request::PlanBuilder::new(pd_client.clone(), req) + let encoded_req = EncodedRequest::new(req, pd_client.get_codec()); + let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(CollectSingle) .extract_error() @@ -383,7 +387,8 @@ impl LockResolver { txn_id: u64, ) -> Result { let req = new_check_secondary_locks_request(keys, txn_id); - let plan = crate::request::PlanBuilder::new(pd_client.clone(), req) + let encoded_req = EncodedRequest::new(req, pd_client.get_codec()); + let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) .retry_multi_region(DEFAULT_REGION_BACKOFF) .extract_error() .merge(Collect) @@ -399,7 +404,8 @@ impl LockResolver { ) -> Result { let ver_id = store.region_with_leader.ver_id(); let request = requests::new_batch_resolve_lock_request(txn_infos.clone()); - let plan = crate::request::PlanBuilder::new(pd_client.clone(), request) + let encoded_req = EncodedRequest::new(request, pd_client.get_codec()); + let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) .single_region_with_store(store.clone()) .await? .extract_error() diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index f4b49a3e..b28ea2b3 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -18,7 +18,7 @@ use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::kvrpcpb; use crate::proto::pdpb::Timestamp; -use crate::request::codec::Codec; +use crate::request::codec::{Codec, EncodedRequest}; use crate::request::Collect; use crate::request::CollectError; use crate::request::CollectSingle; @@ -134,7 +134,7 @@ impl> Transaction { self.buffer .get_or_else(key, |key| async move { - let request = new_get_request(key, timestamp); + let request = EncodedRequest::new(new_get_request(key, timestamp), rpc.get_codec()); let plan = PlanBuilder::new(rpc, request) .resolve_lock(retry_options.lock_backoff) .retry_multi_region(DEFAULT_REGION_BACKOFF) @@ -265,7 +265,8 @@ impl> Transaction { self.buffer .batch_get_or_else(keys.into_iter().map(|k| k.into()), move |keys| async move { let request = new_batch_get_request(keys, timestamp); - let plan = PlanBuilder::new(rpc, request) + let encoded_req = EncodedRequest::new(request, rpc.get_codec()); + let plan = PlanBuilder::new(rpc, encoded_req) .resolve_lock(retry_options.lock_backoff) .retry_multi_region(retry_options.region_backoff) .merge(Collect) @@ -692,7 +693,8 @@ impl> Transaction { primary_key, self.start_instant.elapsed().as_millis() as u64 + MAX_TTL, ); - let plan = PlanBuilder::new(self.rpc.clone(), request) + let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); + let plan = PlanBuilder::new(self.rpc.clone(), encoded_req) .resolve_lock(self.options.retry_options.lock_backoff.clone()) .retry_multi_region(self.options.retry_options.region_backoff.clone()) .merge(CollectSingle) @@ -722,7 +724,8 @@ impl> Transaction { move |new_range, new_limit| async move { let request = new_scan_request(new_range, timestamp, new_limit, key_only, reverse); - let plan = PlanBuilder::new(rpc, request) + let encoded_req = EncodedRequest::new(request, rpc.get_codec()); + let plan = PlanBuilder::new(rpc, encoded_req) .resolve_lock(retry_options.lock_backoff) .retry_multi_region(retry_options.region_backoff) .merge(Collect) @@ -777,7 +780,8 @@ impl> Transaction { for_update_ts.clone(), need_value, ); - let plan = PlanBuilder::new(self.rpc.clone(), request) + let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); + let plan = PlanBuilder::new(self.rpc.clone(), encoded_req) .resolve_lock(self.options.retry_options.lock_backoff.clone()) .preserve_shard() .retry_multi_region_preserve_results(self.options.retry_options.region_backoff.clone()) @@ -831,7 +835,8 @@ impl> Transaction { start_version, for_update_ts, ); - let plan = PlanBuilder::new(self.rpc.clone(), req) + let encoded_req = EncodedRequest::new(req, self.rpc.get_codec()); + let plan = PlanBuilder::new(self.rpc.clone(), encoded_req) .resolve_lock(self.options.retry_options.lock_backoff.clone()) .retry_multi_region(self.options.retry_options.region_backoff.clone()) .extract_error() @@ -901,7 +906,8 @@ impl> Transaction { primary_key.clone(), start_instant.elapsed().as_millis() as u64 + MAX_TTL, ); - let plan = PlanBuilder::new(rpc.clone(), request) + let encoded_req = EncodedRequest::new(request, rpc.get_codec()); + let plan = PlanBuilder::new(rpc.clone(), encoded_req) .retry_multi_region(region_backoff.clone()) .merge(CollectSingle) .plan(); @@ -1210,7 +1216,8 @@ impl Committer { .collect(); // FIXME set max_commit_ts and min_commit_ts - let plan = PlanBuilder::new(self.rpc.clone(), request) + let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); + let plan = PlanBuilder::new(self.rpc.clone(), encoded_req) .resolve_lock(self.options.retry_options.lock_backoff.clone()) .retry_multi_region(self.options.retry_options.region_backoff.clone()) .merge(CollectError) @@ -1250,7 +1257,8 @@ impl Committer { self.start_version.clone(), commit_version.clone(), ); - let plan = PlanBuilder::new(self.rpc.clone(), req) + let encoded_req = EncodedRequest::new(req, self.rpc.get_codec()); + let plan = PlanBuilder::new(self.rpc.clone(), encoded_req) .resolve_lock(self.options.retry_options.lock_backoff.clone()) .retry_multi_region(self.options.retry_options.region_backoff.clone()) .extract_error() @@ -1314,7 +1322,8 @@ impl Committer { .filter(|key| &primary_key != key); new_commit_request(keys, self.start_version, commit_version) }; - let plan = PlanBuilder::new(self.rpc, req) + let encoded_req = EncodedRequest::new(req, self.rpc.get_codec()); + let plan = PlanBuilder::new(self.rpc, encoded_req) .resolve_lock(self.options.retry_options.lock_backoff) .retry_multi_region(self.options.retry_options.region_backoff) .extract_error() @@ -1335,7 +1344,8 @@ impl Committer { match self.options.kind { TransactionKind::Optimistic => { let req = new_batch_rollback_request(keys, self.start_version); - let plan = PlanBuilder::new(self.rpc, req) + let encoded_req = EncodedRequest::new(req, self.rpc.get_codec()); + let plan = PlanBuilder::new(self.rpc, encoded_req) .resolve_lock(self.options.retry_options.lock_backoff) .retry_multi_region(self.options.retry_options.region_backoff) .extract_error() @@ -1344,7 +1354,8 @@ impl Committer { } TransactionKind::Pessimistic(for_update_ts) => { let req = new_pessimistic_rollback_request(keys, self.start_version, for_update_ts); - let plan = PlanBuilder::new(self.rpc, req) + let encoded_req = EncodedRequest::new(req, self.rpc.get_codec()); + let plan = PlanBuilder::new(self.rpc, encoded_req) .resolve_lock(self.options.retry_options.lock_backoff) .retry_multi_region(self.options.retry_options.region_backoff) .extract_error() From 9201cf6b80d6bf57ce9f63ceccd7c681627c2525 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Fri, 25 Aug 2023 20:55:09 +0800 Subject: [PATCH 3/7] polish Signed-off-by: Ping Yu --- src/mock.rs | 14 +++++++------- src/pd/client.rs | 31 +++++++++++++++++-------------- src/raw/client.rs | 9 +++++---- src/request/codec.rs | 17 ++++++++++++----- src/transaction/client.rs | 14 +++++++------- src/transaction/lock.rs | 1 - 6 files changed, 48 insertions(+), 38 deletions(-) diff --git a/src/mock.rs b/src/mock.rs index 887bb2d2..3b0b157f 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -18,7 +18,7 @@ use crate::proto::metapb::RegionEpoch; use crate::proto::metapb::{self}; use crate::region::RegionId; use crate::region::RegionWithLeader; -use crate::request::codec::ApiV1Codec; +use crate::request::codec::ApiV1TxnCodec; use crate::store::KvClient; use crate::store::KvConnect; use crate::store::RegionStore; @@ -31,7 +31,7 @@ use crate::Timestamp; /// Create a `PdRpcClient` with it's internals replaced with mocks so that the /// client can be tested without doing any RPC calls. -pub async fn pd_rpc_client() -> PdRpcClient { +pub async fn pd_rpc_client() -> PdRpcClient { let config = Config::default(); PdRpcClient::new( config.clone(), @@ -44,7 +44,7 @@ pub async fn pd_rpc_client() -> PdRpcClient MockPdClient { MockPdClient { client, - codec: ApiV1Codec::default(), + codec: ApiV1TxnCodec::default(), } } } @@ -113,7 +113,7 @@ impl MockPdClient { pub fn default() -> MockPdClient { MockPdClient { client: MockKvClient::default(), - codec: ApiV1Codec::default(), + codec: ApiV1TxnCodec::default(), } } @@ -177,7 +177,7 @@ impl MockPdClient { #[async_trait] impl PdClient for MockPdClient { - type Codec = ApiV1Codec; + type Codec = ApiV1TxnCodec; type KvClient = MockKvClient; async fn map_region_to_store(self: Arc, region: RegionWithLeader) -> Result { diff --git a/src/pd/client.rs b/src/pd/client.rs index e22b7c90..f291c62c 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -20,7 +20,7 @@ use crate::region::RegionId; use crate::region::RegionVerId; use crate::region::RegionWithLeader; use crate::region_cache::RegionCache; -use crate::request::codec::{ApiV1Codec, Codec}; +use crate::request::codec::{ApiV1TxnCodec, Codec}; use crate::store::KvClient; use crate::store::KvConnect; use crate::store::RegionStore; @@ -191,8 +191,11 @@ pub trait PdClient: Send + Sync + 'static { .boxed() } - fn decode_region(mut region: RegionWithLeader, enable_codec: bool) -> Result { - if enable_codec { + fn decode_region( + mut region: RegionWithLeader, + enable_mvcc_codec: bool, + ) -> Result { + if enable_mvcc_codec { codec::decode_bytes_in_place(&mut region.region.start_key, false)?; codec::decode_bytes_in_place(&mut region.region.end_key, false)?; } @@ -204,21 +207,21 @@ pub trait PdClient: Send + Sync + 'static { async fn invalidate_region_cache(&self, ver_id: RegionVerId); /// Get the codec carried by `PdClient`. - /// The purpose of carrying the codec is to reduce the passing of it on so many calling paths. + /// The purpose of carrying the codec is to avoid passing it on so many calling paths. fn get_codec(&self) -> &Self::Codec; } /// This client converts requests for the logical TiKV cluster into requests /// for a single TiKV store using PD and internal logic. pub struct PdRpcClient< - Cod: Codec = ApiV1Codec, + Cod: Codec = ApiV1TxnCodec, KvC: KvConnect + Send + Sync + 'static = TikvConnect, Cl = Cluster, > { pd: Arc>, kv_connect: KvC, kv_client_cache: Arc>>, - enable_codec: bool, + enable_mvcc_codec: bool, region_cache: RegionCache>, codec: Option, } @@ -236,20 +239,20 @@ impl PdClient for PdRpcClien } async fn region_for_key(&self, key: &Key) -> Result { - let enable_codec = self.enable_codec; - let key = if enable_codec { + let enable_mvcc_codec = self.enable_mvcc_codec; + let key = if enable_mvcc_codec { key.to_encoded() } else { key.clone() }; let region = self.region_cache.get_region_by_key(&key).await?; - Self::decode_region(region, enable_codec) + Self::decode_region(region, enable_mvcc_codec) } async fn region_for_id(&self, id: RegionId) -> Result { let region = self.region_cache.get_region_by_id(id).await?; - Self::decode_region(region, self.enable_codec) + Self::decode_region(region, self.enable_mvcc_codec) } async fn get_timestamp(self: Arc) -> Result { @@ -279,14 +282,14 @@ impl PdRpcClient { pub async fn connect( pd_endpoints: &[String], config: Config, - enable_codec: bool, + enable_mvcc_codec: bool, // TODO: infer from `codec`. codec: Option, ) -> Result> { PdRpcClient::new( config.clone(), |security_mgr| TikvConnect::new(security_mgr, config.timeout), |security_mgr| RetryClient::connect(pd_endpoints, security_mgr, config.timeout), - enable_codec, + enable_mvcc_codec, codec, ) .await @@ -298,7 +301,7 @@ impl PdRpcClient, ) -> Result> where @@ -322,7 +325,7 @@ impl PdRpcClient> +pub struct Client> where Cod: Codec, PdC: PdClient, @@ -59,7 +59,7 @@ impl Clone for Client { } } -impl Client { +impl Client> { /// Create a raw [`Client`] and connect to the TiKV cluster. /// /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for @@ -106,7 +106,8 @@ impl Client { ) -> Result { let pd_endpoints: Vec = pd_endpoints.into_iter().map(Into::into).collect(); let rpc = Arc::new( - PdRpcClient::connect(&pd_endpoints, config, false, Some(ApiV1Codec::default())).await?, + PdRpcClient::connect(&pd_endpoints, config, false, Some(ApiV1RawCodec::default())) + .await?, ); Ok(Client { rpc, diff --git a/src/request/codec.rs b/src/request/codec.rs index 06e447c6..a409a8e7 100644 --- a/src/request/codec.rs +++ b/src/request/codec.rs @@ -9,16 +9,21 @@ pub trait Codec: Clone + Sync + Send + 'static { } #[derive(Clone, Default)] -pub struct ApiV1Codec {} +pub struct ApiV1TxnCodec {} -impl Codec for ApiV1Codec {} +impl Codec for ApiV1TxnCodec {} + +#[derive(Clone, Default)] +pub struct ApiV1RawCodec {} + +impl Codec for ApiV1RawCodec {} #[derive(Clone)] -pub struct ApiV2Codec { +pub struct ApiV2TxnCodec { _keyspace_id: u32, } -impl ApiV2Codec { +impl ApiV2TxnCodec { pub fn new(keyspace_id: u32) -> Self { Self { _keyspace_id: keyspace_id, @@ -26,13 +31,15 @@ impl ApiV2Codec { } } -impl Codec for ApiV2Codec { +impl Codec for ApiV2TxnCodec { fn encode_request(&self, req: &mut R) { req.set_api_version(kvrpcpb::ApiVersion::V2); // TODO: req.encode_request(self); } } +// TODO: pub struct ApiV2RawCodec + // EncodeRequest is just a type wrapper to avoid passing not encoded request to `PlanBuilder` by mistake. #[derive(Clone)] pub struct EncodedRequest { diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 8d1cf43d..e4d40618 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -10,7 +10,7 @@ use crate::config::Config; use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::pdpb::Timestamp; -use crate::request::codec::{ApiV1Codec, ApiV2Codec, Codec, EncodedRequest}; +use crate::request::codec::{ApiV1TxnCodec, ApiV2TxnCodec, Codec, EncodedRequest}; use crate::request::plan::CleanupLocksResult; use crate::request::Plan; use crate::timestamp::TimestampExt; @@ -43,7 +43,7 @@ const SCAN_LOCK_BATCH_SIZE: u32 = 1024; /// /// The returned results of transactional requests are [`Future`](std::future::Future)s that must be /// awaited to execute. -pub struct Client { +pub struct Client { pd: Arc>, } @@ -55,7 +55,7 @@ impl Clone for Client { } } -impl Client { +impl Client { /// Create a transactional [`Client`] and connect to the TiKV cluster. /// /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for @@ -100,21 +100,21 @@ impl Client { pd_endpoints: Vec, config: Config, ) -> Result { - Self::new_with_codec(pd_endpoints, config, ApiV1Codec::default()).await + Self::new_with_codec(pd_endpoints, config, ApiV1TxnCodec::default()).await } } -impl Client { +impl Client { pub async fn new_with_config_v2>( _keyspace_name: &str, pd_endpoints: Vec, config: Config, - ) -> Result> { + ) -> Result> { debug!("creating new transactional client APIv2"); let pd_endpoints: Vec = pd_endpoints.into_iter().map(Into::into).collect(); let mut pd = PdRpcClient::connect(&pd_endpoints, config, true, None).await?; let keyspace_id = 0; // TODO: get keyspace_id by pd.get_keyspace(keyspace_name) - pd.set_codec(ApiV2Codec::new(keyspace_id)); + pd.set_codec(ApiV2TxnCodec::new(keyspace_id)); Ok(Client { pd: Arc::new(pd) }) } } diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index c55ab381..0ccb5e46 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -121,7 +121,6 @@ async fn resolve_lock_with_retry( let ver_id = store.region_with_leader.ver_id(); let request = requests::new_resolve_lock_request(start_version, commit_version); let encoded_req = EncodedRequest::new(request, pd_client.get_codec()); - // The only place where single-region is used let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) .single_region_with_store(store) .await? From c9fbf69e30187c9af8b23ea99025d3b7ac9a3a06 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Mon, 28 Aug 2023 11:22:17 +0800 Subject: [PATCH 4/7] polish Signed-off-by: Ping Yu --- src/request/mod.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/request/mod.rs b/src/request/mod.rs index 55746c5c..429adea5 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -146,9 +146,7 @@ mod test { unreachable!(); } - fn set_api_version(&mut self, _api_version: ApiVersion) { - unreachable!(); - } + fn set_api_version(&mut self, _api_version: ApiVersion) {} } #[async_trait] From 12b217db644c4187adb30403ee7db2d6817b7136 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Mon, 28 Aug 2023 12:25:00 +0800 Subject: [PATCH 5/7] export proto Signed-off-by: Ping Yu --- src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index b9ba66c0..a2acf57b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -94,6 +94,8 @@ pub mod backoff; #[doc(hidden)] +pub mod proto; // export `proto` to enable user customized codec +#[doc(hidden)] pub mod raw; pub mod request; #[doc(hidden)] @@ -104,7 +106,6 @@ mod compat; mod config; mod kv; mod pd; -mod proto; mod region; mod region_cache; mod stats; From 19f829d46cbd7fe51c0dbe521b6163b2385ae410 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Mon, 28 Aug 2023 17:11:36 +0800 Subject: [PATCH 6/7] fix set_context Signed-off-by: Ping Yu --- src/raw/requests.rs | 4 ++-- src/request/shard.rs | 6 +++--- src/store/request.rs | 8 ++++++++ src/transaction/requests.rs | 13 +++++++------ 4 files changed, 20 insertions(+), 11 deletions(-) diff --git a/src/raw/requests.rs b/src/raw/requests.rs index ff1cc14f..23bfce73 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -162,7 +162,7 @@ impl Shardable for kvrpcpb::RawBatchPutRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.context = Some(store.region_with_leader.context()?); + self.set_context(store.region_with_leader.context()?); self.pairs = shard; Ok(()) } @@ -293,7 +293,7 @@ impl Shardable for kvrpcpb::RawBatchScanRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.context = Some(store.region_with_leader.context()?); + self.set_context(store.region_with_leader.context()?); self.ranges = shard; Ok(()) } diff --git a/src/request/shard.rs b/src/request/shard.rs index aaefab72..7c78743d 100644 --- a/src/request/shard.rs +++ b/src/request/shard.rs @@ -163,7 +163,7 @@ macro_rules! shardable_key { mut shard: Self::Shard, store: &$crate::store::RegionStore, ) -> $crate::Result<()> { - self.context = Some(store.region_with_leader.context()?); + self.set_context(store.region_with_leader.context()?); assert!(shard.len() == 1); self.key = shard.pop().unwrap(); Ok(()) @@ -196,7 +196,7 @@ macro_rules! shardable_keys { shard: Self::Shard, store: &$crate::store::RegionStore, ) -> $crate::Result<()> { - self.context = Some(store.region_with_leader.context()?); + self.set_context(store.region_with_leader.context()?); self.keys = shard.into_iter().map(Into::into).collect(); Ok(()) } @@ -225,7 +225,7 @@ macro_rules! shardable_range { shard: Self::Shard, store: &$crate::store::RegionStore, ) -> $crate::Result<()> { - self.context = Some(store.region_with_leader.context()?); + self.set_context(store.region_with_leader.context()?); self.start_key = shard.0.into(); self.end_key = shard.1.into(); diff --git a/src/store/request.rs b/src/store/request.rs index a26e57e0..2f1a31a0 100644 --- a/src/store/request.rs +++ b/src/store/request.rs @@ -21,6 +21,8 @@ pub trait Request: Any + Sync + Send + 'static { ) -> Result>; fn label(&self) -> &'static str; fn as_any(&self) -> &dyn Any; + /// Set the context for the request. + /// Should always use `set_context` other than modify the `self.context` directly. fn set_context(&mut self, context: kvrpcpb::Context); fn set_api_version(&mut self, api_version: kvrpcpb::ApiVersion); } @@ -53,7 +55,13 @@ macro_rules! impl_request { } fn set_context(&mut self, context: kvrpcpb::Context) { + let api_version = self + .context + .as_ref() + .map(|c| c.api_version) + .unwrap_or_default(); self.context = Some(context); + self.set_api_version(kvrpcpb::ApiVersion::from_i32(api_version).unwrap()); } fn set_api_version(&mut self, api_version: kvrpcpb::ApiVersion) { diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 9cc4993f..bd3852e3 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -38,6 +38,7 @@ use crate::shardable_range; use crate::store::store_stream_for_keys; use crate::store::store_stream_for_range; use crate::store::RegionStore; +use crate::store::Request; use crate::timestamp::TimestampExt; use crate::transaction::HasLocks; use crate::util::iter::FlatMapOkIterExt; @@ -294,7 +295,7 @@ impl Shardable for kvrpcpb::PrewriteRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.context = Some(store.region_with_leader.context()?); + self.set_context(store.region_with_leader.context()?); // Only need to set secondary keys if we're sending the primary key. if self.use_async_commit && !self.mutations.iter().any(|m| m.key == self.primary_lock) { @@ -361,7 +362,7 @@ impl Shardable for kvrpcpb::CommitRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.context = Some(store.region_with_leader.context()?); + self.set_context(store.region_with_leader.context()?); self.keys = shard.into_iter().map(Into::into).collect(); Ok(()) } @@ -452,7 +453,7 @@ impl Shardable for kvrpcpb::PessimisticLockRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.context = Some(store.region_with_leader.context()?); + self.set_context(store.region_with_leader.context()?); self.mutations = shard; Ok(()) } @@ -553,7 +554,7 @@ impl Shardable for kvrpcpb::ScanLockRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.context = Some(store.region_with_leader.context()?); + self.set_context(store.region_with_leader.context()?); self.start_key = shard.0; Ok(()) } @@ -614,7 +615,7 @@ impl Shardable for kvrpcpb::TxnHeartBeatRequest { } fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.context = Some(store.region_with_leader.context()?); + self.set_context(store.region_with_leader.context()?); assert!(shard.len() == 1); self.primary_lock = shard.pop().unwrap(); Ok(()) @@ -672,7 +673,7 @@ impl Shardable for kvrpcpb::CheckTxnStatusRequest { } fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.context = Some(store.region_with_leader.context()?); + self.set_context(store.region_with_leader.context()?); assert!(shard.len() == 1); self.primary_key = shard.pop().unwrap(); Ok(()) From 34b99f5a7eb0af64cbc6e3bf21a605643dcf0506 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Wed, 30 Aug 2023 11:30:32 +0800 Subject: [PATCH 7/7] add Codec parameter to Transaction & Snapshot Signed-off-by: Ping Yu --- src/transaction/client.rs | 10 +++++----- src/transaction/snapshot.rs | 9 ++++++--- src/transaction/transaction.rs | 17 +++++++++++------ 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/src/transaction/client.rs b/src/transaction/client.rs index e4d40618..610ea065 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -152,7 +152,7 @@ impl Client { /// transaction.commit().await.unwrap(); /// # }); /// ``` - pub async fn begin_optimistic(&self) -> Result>> { + pub async fn begin_optimistic(&self) -> Result>> { debug!("creating new optimistic transaction"); let timestamp = self.current_timestamp().await?; Ok(self.new_transaction(timestamp, TransactionOptions::new_optimistic())) @@ -175,7 +175,7 @@ impl Client { /// transaction.commit().await.unwrap(); /// # }); /// ``` - pub async fn begin_pessimistic(&self) -> Result>> { + pub async fn begin_pessimistic(&self) -> Result>> { debug!("creating new pessimistic transaction"); let timestamp = self.current_timestamp().await?; Ok(self.new_transaction(timestamp, TransactionOptions::new_pessimistic())) @@ -201,7 +201,7 @@ impl Client { pub async fn begin_with_options( &self, options: TransactionOptions, - ) -> Result>> { + ) -> Result>> { debug!("creating new customized transaction"); let timestamp = self.current_timestamp().await?; Ok(self.new_transaction(timestamp, options)) @@ -212,7 +212,7 @@ impl Client { &self, timestamp: Timestamp, options: TransactionOptions, - ) -> Snapshot> { + ) -> Snapshot> { debug!("creating new snapshot"); Snapshot::new(self.new_transaction(timestamp, options.read_only())) } @@ -311,7 +311,7 @@ impl Client { &self, timestamp: Timestamp, options: TransactionOptions, - ) -> Transaction> { + ) -> Transaction> { Transaction::new(timestamp, self.pd.clone(), options) } } diff --git a/src/transaction/snapshot.rs b/src/transaction/snapshot.rs index 2ae15769..a8aa9464 100644 --- a/src/transaction/snapshot.rs +++ b/src/transaction/snapshot.rs @@ -2,7 +2,9 @@ use derive_new::new; use log::debug; +use std::marker::PhantomData; +use crate::codec::ApiV1TxnCodec; use crate::pd::{PdClient, PdRpcClient}; use crate::request::codec::Codec; use crate::BoundRange; @@ -20,11 +22,12 @@ use crate::Value; /// /// See the [Transaction](struct@crate::Transaction) docs for more information on the methods. #[derive(new)] -pub struct Snapshot { - transaction: Transaction, +pub struct Snapshot> { + transaction: Transaction, + phantom: PhantomData, } -impl> Snapshot { +impl> Snapshot { /// Get the value associated with the given key. pub async fn get(&mut self, key: impl Into) -> Result> { debug!("invoking get request on snapshot"); diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index b28ea2b3..9317e171 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -1,6 +1,7 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use std::iter; +use std::marker::PhantomData; use std::sync::Arc; use std::time::Instant; @@ -14,6 +15,7 @@ use tokio::time::Duration; use crate::backoff::Backoff; use crate::backoff::DEFAULT_REGION_BACKOFF; +use crate::codec::ApiV1TxnCodec; use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::kvrpcpb; @@ -74,7 +76,7 @@ use crate::Value; /// txn.commit().await.unwrap(); /// # }); /// ``` -pub struct Transaction { +pub struct Transaction> { status: Arc>, timestamp: Timestamp, buffer: Buffer, @@ -82,14 +84,15 @@ pub struct Transaction { options: TransactionOptions, is_heartbeat_started: bool, start_instant: Instant, + phantom: PhantomData, } -impl> Transaction { +impl> Transaction { pub(crate) fn new( timestamp: Timestamp, rpc: Arc, options: TransactionOptions, - ) -> Transaction { + ) -> Transaction { let status = if options.read_only { TransactionStatus::ReadOnly } else { @@ -103,6 +106,7 @@ impl> Transaction { options, is_heartbeat_started: false, start_instant: std::time::Instant::now(), + phantom: PhantomData, } } @@ -134,8 +138,9 @@ impl> Transaction { self.buffer .get_or_else(key, |key| async move { - let request = EncodedRequest::new(new_get_request(key, timestamp), rpc.get_codec()); - let plan = PlanBuilder::new(rpc, request) + let request = new_get_request(key, timestamp); + let encoded_req = EncodedRequest::new(request, rpc.get_codec()); + let plan = PlanBuilder::new(rpc, encoded_req) .resolve_lock(retry_options.lock_backoff) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(CollectSingle) @@ -924,7 +929,7 @@ impl> Transaction { } } -impl Drop for Transaction { +impl Drop for Transaction { fn drop(&mut self) { debug!("dropping transaction"); if std::thread::panicking() {