diff --git a/src/lib.rs b/src/lib.rs index a2acf57b..60dc2956 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -94,8 +94,6 @@ pub mod backoff; #[doc(hidden)] -pub mod proto; // export `proto` to enable user customized codec -#[doc(hidden)] pub mod raw; pub mod request; #[doc(hidden)] @@ -106,6 +104,7 @@ mod compat; mod config; mod kv; mod pd; +mod proto; mod region; mod region_cache; mod stats; @@ -146,8 +145,6 @@ pub use crate::raw::Client as RawClient; #[doc(inline)] pub use crate::raw::ColumnFamily; #[doc(inline)] -pub use crate::request::codec; -#[doc(inline)] pub use crate::request::RetryOptions; #[doc(inline)] pub use crate::timestamp::Timestamp; diff --git a/src/mock.rs b/src/mock.rs index f9c94aef..38fea6b2 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -18,7 +18,6 @@ use crate::proto::metapb::RegionEpoch; use crate::proto::metapb::{self}; use crate::region::RegionId; use crate::region::RegionWithLeader; -use crate::request::codec::ApiV1TxnCodec; use crate::store::KvConnect; use crate::store::RegionStore; use crate::store::Request; @@ -31,7 +30,7 @@ use crate::Timestamp; /// Create a `PdRpcClient` with it's internals replaced with mocks so that the /// client can be tested without doing any RPC calls. -pub async fn pd_rpc_client() -> PdRpcClient { +pub async fn pd_rpc_client() -> PdRpcClient { let config = Config::default(); PdRpcClient::new( config.clone(), @@ -44,7 +43,6 @@ pub async fn pd_rpc_client() -> PdRpcClient MockPdClient { - MockPdClient { - client, - codec: ApiV1TxnCodec::default(), - } - } } #[async_trait] @@ -113,7 +102,6 @@ impl MockPdClient { pub fn default() -> MockPdClient { MockPdClient { client: MockKvClient::default(), - codec: ApiV1TxnCodec::default(), } } @@ -177,7 +165,6 @@ impl MockPdClient { #[async_trait] impl PdClient for MockPdClient { - type Codec = ApiV1TxnCodec; type KvClient = MockKvClient; async fn map_region_to_store(self: Arc, region: RegionWithLeader) -> Result { @@ -227,8 +214,4 @@ impl PdClient for MockPdClient { } async fn invalidate_region_cache(&self, _ver_id: crate::region::RegionVerId) {} - - fn get_codec(&self) -> &Self::Codec { - &self.codec - } } diff --git a/src/pd/client.rs b/src/pd/client.rs index 5461cb57..8eb4aa34 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -20,7 +20,6 @@ use crate::region::RegionId; use crate::region::RegionVerId; use crate::region::RegionWithLeader; use crate::region_cache::RegionCache; -use crate::request::codec::{ApiV1TxnCodec, Codec}; use crate::store::KvConnect; use crate::store::RegionStore; use crate::store::TikvConnect; @@ -51,7 +50,6 @@ use crate::Timestamp; /// So if we use transactional APIs, keys in PD are encoded and PD does not know about the encoding stuff. #[async_trait] pub trait PdClient: Send + Sync + 'static { - type Codec: Codec; type KvClient: KvClient + Send + Sync + 'static; /// In transactional API, `region` is decoded (keys in raw format). @@ -193,11 +191,8 @@ pub trait PdClient: Send + Sync + 'static { .boxed() } - fn decode_region( - mut region: RegionWithLeader, - enable_mvcc_codec: bool, - ) -> Result { - if enable_mvcc_codec { + fn decode_region(mut region: RegionWithLeader, enable_codec: bool) -> Result { + if enable_codec { codec::decode_bytes_in_place(&mut region.region.start_key, false)?; codec::decode_bytes_in_place(&mut region.region.end_key, false)?; } @@ -207,30 +202,20 @@ pub trait PdClient: Send + Sync + 'static { async fn update_leader(&self, ver_id: RegionVerId, leader: metapb::Peer) -> Result<()>; async fn invalidate_region_cache(&self, ver_id: RegionVerId); - - /// Get the codec carried by `PdClient`. - /// The purpose of carrying the codec is to avoid passing it on so many calling paths. - fn get_codec(&self) -> &Self::Codec; } /// This client converts requests for the logical TiKV cluster into requests /// for a single TiKV store using PD and internal logic. -pub struct PdRpcClient< - Cod: Codec = ApiV1TxnCodec, - KvC: KvConnect + Send + Sync + 'static = TikvConnect, - Cl = Cluster, -> { +pub struct PdRpcClient { pd: Arc>, kv_connect: KvC, kv_client_cache: Arc>>, - enable_mvcc_codec: bool, + enable_codec: bool, region_cache: RegionCache>, - codec: Option, } #[async_trait] -impl PdClient for PdRpcClient { - type Codec = Cod; +impl PdClient for PdRpcClient { type KvClient = KvC::KvClient; async fn map_region_to_store(self: Arc, region: RegionWithLeader) -> Result { @@ -241,20 +226,20 @@ impl PdClient for PdRpcClien } async fn region_for_key(&self, key: &Key) -> Result { - let enable_mvcc_codec = self.enable_mvcc_codec; - let key = if enable_mvcc_codec { + let enable_codec = self.enable_codec; + let key = if enable_codec { key.to_encoded() } else { key.clone() }; let region = self.region_cache.get_region_by_key(&key).await?; - Self::decode_region(region, enable_mvcc_codec) + Self::decode_region(region, enable_codec) } async fn region_for_id(&self, id: RegionId) -> Result { let region = self.region_cache.get_region_by_id(id).await?; - Self::decode_region(region, self.enable_mvcc_codec) + Self::decode_region(region, self.enable_codec) } async fn all_stores(&self) -> Result> { @@ -282,40 +267,31 @@ impl PdClient for PdRpcClien async fn invalidate_region_cache(&self, ver_id: RegionVerId) { self.region_cache.invalidate_region_cache(ver_id).await } - - fn get_codec(&self) -> &Self::Codec { - self.codec - .as_ref() - .unwrap_or_else(|| panic!("codec not set")) - } } -impl PdRpcClient { +impl PdRpcClient { pub async fn connect( pd_endpoints: &[String], config: Config, - enable_mvcc_codec: bool, // TODO: infer from `codec`. - codec: Option, - ) -> Result> { + enable_codec: bool, + ) -> Result { PdRpcClient::new( config.clone(), |security_mgr| TikvConnect::new(security_mgr, config.timeout), |security_mgr| RetryClient::connect(pd_endpoints, security_mgr, config.timeout), - enable_mvcc_codec, - codec, + enable_codec, ) .await } } -impl PdRpcClient { +impl PdRpcClient { pub async fn new( config: Config, kv_connect: MakeKvC, pd: MakePd, - enable_mvcc_codec: bool, - codec: Option, - ) -> Result> + enable_codec: bool, + ) -> Result> where PdFut: Future>>, MakeKvC: FnOnce(Arc) -> KvC, @@ -337,9 +313,8 @@ impl PdRpcClient PdRpcClient Err(e), } } - - pub fn set_codec(&mut self, codec: Cod) { - self.codec = Some(codec); - } } fn make_key_range(start_key: Vec, end_key: Vec) -> kvrpcpb::KeyRange { diff --git a/src/raw/client.rs b/src/raw/client.rs index fc733015..0bdc2f8b 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -15,7 +15,6 @@ use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::metapb; use crate::raw::lowering::*; -use crate::request::codec::{ApiV1RawCodec, Codec, EncodedRequest}; use crate::request::Collect; use crate::request::CollectSingle; use crate::request::Plan; @@ -36,11 +35,7 @@ const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240; /// /// The returned results of raw request methods are [`Future`](std::future::Future)s that must be /// awaited to execute. -pub struct Client> -where - Cod: Codec, - PdC: PdClient, -{ +pub struct Client { rpc: Arc, cf: Option, backoff: Backoff, @@ -59,7 +54,7 @@ impl Clone for Client { } } -impl Client> { +impl Client { /// Create a raw [`Client`] and connect to the TiKV cluster. /// /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for @@ -105,10 +100,7 @@ impl Client> { config: Config, ) -> Result { let pd_endpoints: Vec = pd_endpoints.into_iter().map(Into::into).collect(); - let rpc = Arc::new( - PdRpcClient::connect(&pd_endpoints, config, false, Some(ApiV1RawCodec::default())) - .await?, - ); + let rpc = Arc::new(PdRpcClient::connect(&pd_endpoints, config, false).await?); Ok(Client { rpc, cf: None, @@ -150,9 +142,7 @@ impl Client> { atomic: self.atomic, } } -} -impl Client> { /// Set the [`Backoff`] strategy for retrying requests. /// The default strategy is [`DEFAULT_REGION_BACKOFF`](crate::backoff::DEFAULT_REGION_BACKOFF). /// See [`Backoff`] for more information. @@ -199,7 +189,7 @@ impl Client> { } } -impl> Client { +impl Client { /// Create a new 'get' request. /// /// Once resolved this request will result in the fetching of the value associated with the @@ -221,8 +211,7 @@ impl> Client { pub async fn get(&self, key: impl Into) -> Result> { debug!("invoking raw get request"); let request = new_raw_get_request(key.into(), self.cf.clone()); - let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) .retry_multi_region(self.backoff.clone()) .merge(CollectSingle) .post_process_default() @@ -254,8 +243,7 @@ impl> Client { ) -> Result> { debug!("invoking raw batch_get request"); let request = new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone()); - let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) .retry_multi_region(self.backoff.clone()) .merge(Collect) .plan(); @@ -283,8 +271,7 @@ impl> Client { pub async fn put(&self, key: impl Into, value: impl Into) -> Result<()> { debug!("invoking raw put request"); let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic); - let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) .retry_multi_region(self.backoff.clone()) .merge(CollectSingle) .extract_error() @@ -320,8 +307,7 @@ impl> Client { self.cf.clone(), self.atomic, ); - let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) .retry_multi_region(self.backoff.clone()) .extract_error() .plan(); @@ -349,8 +335,7 @@ impl> Client { pub async fn delete(&self, key: impl Into) -> Result<()> { debug!("invoking raw delete request"); let request = new_raw_delete_request(key.into(), self.cf.clone(), self.atomic); - let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) .retry_multi_region(self.backoff.clone()) .merge(CollectSingle) .extract_error() @@ -381,8 +366,7 @@ impl> Client { self.assert_non_atomic()?; let request = new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone()); - let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) .retry_multi_region(self.backoff.clone()) .extract_error() .plan(); @@ -409,8 +393,7 @@ impl> Client { debug!("invoking raw delete_range request"); self.assert_non_atomic()?; let request = new_raw_delete_range_request(range.into(), self.cf.clone()); - let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) .retry_multi_region(self.backoff.clone()) .extract_error() .plan(); @@ -566,8 +549,7 @@ impl> Client { previous_value.into(), self.cf.clone(), ); - let encoded_req = EncodedRequest::new(req, self.rpc.get_codec()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req) .retry_multi_region(self.backoff.clone()) .merge(CollectSingle) .post_process_default() @@ -590,8 +572,7 @@ impl> Client { ranges.into_iter().map(Into::into), request_builder, ); - let encoded_req = EncodedRequest::new(req, self.rpc.get_codec()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req) .preserve_shard() .retry_multi_region(self.backoff.clone()) .post_process_default() @@ -625,8 +606,7 @@ impl> Client { while cur_limit > 0 { let request = new_raw_scan_request(cur_range.clone(), cur_limit, key_only, self.cf.clone()); - let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); - let resp = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) + let resp = crate::request::PlanBuilder::new(self.rpc.clone(), request) .single_region_with_store(region_store.clone()) .await? .plan() @@ -681,8 +661,7 @@ impl> Client { key_only, self.cf.clone(), ); - let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); - let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req) + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) .retry_multi_region(self.backoff.clone()) .merge(Collect) .plan(); diff --git a/src/raw/requests.rs b/src/raw/requests.rs index 0be733cf..f9c64717 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -13,7 +13,6 @@ use super::RawRpcRequest; use crate::collect_first; use crate::pd::PdClient; use crate::proto::kvrpcpb; -use crate::proto::kvrpcpb::ApiVersion; use crate::proto::metapb; use crate::proto::tikvpb::tikv_client::TikvClient; use crate::range_request; @@ -164,7 +163,7 @@ impl Shardable for kvrpcpb::RawBatchPutRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_context(store.region_with_leader.context()?); + self.context = Some(store.region_with_leader.context()?); self.pairs = shard; Ok(()) } @@ -297,7 +296,7 @@ impl Shardable for kvrpcpb::RawBatchScanRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_context(store.region_with_leader.context()?); + self.context = Some(store.region_with_leader.context()?); self.ranges = shard; Ok(()) } @@ -403,7 +402,7 @@ impl Request for RawCoprocessorRequest { self.inner.set_context(context); } - fn set_api_version(&mut self, api_version: ApiVersion) { + fn set_api_version(&mut self, api_version: kvrpcpb::ApiVersion) { self.inner.set_api_version(api_version); } } @@ -505,7 +504,6 @@ mod test { use crate::mock::MockKvClient; use crate::mock::MockPdClient; use crate::proto::kvrpcpb; - use crate::request::codec::EncodedRequest; use crate::request::Plan; use crate::Key; @@ -540,8 +538,7 @@ mod test { key_only: true, ..Default::default() }; - let encoded_scan = EncodedRequest::new(scan, client.get_codec()); - let plan = crate::request::PlanBuilder::new(client, encoded_scan) + let plan = crate::request::PlanBuilder::new(client, scan) .resolve_lock(OPTIMISTIC_BACKOFF) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(Collect) diff --git a/src/request/codec.rs b/src/request/codec.rs deleted file mode 100644 index a409a8e7..00000000 --- a/src/request/codec.rs +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. - -use crate::proto::kvrpcpb; -use crate::request::KvRequest; - -pub trait Codec: Clone + Sync + Send + 'static { - fn encode_request(&self, _req: &mut R) {} - // TODO: fn decode_response() -} - -#[derive(Clone, Default)] -pub struct ApiV1TxnCodec {} - -impl Codec for ApiV1TxnCodec {} - -#[derive(Clone, Default)] -pub struct ApiV1RawCodec {} - -impl Codec for ApiV1RawCodec {} - -#[derive(Clone)] -pub struct ApiV2TxnCodec { - _keyspace_id: u32, -} - -impl ApiV2TxnCodec { - pub fn new(keyspace_id: u32) -> Self { - Self { - _keyspace_id: keyspace_id, - } - } -} - -impl Codec for ApiV2TxnCodec { - fn encode_request(&self, req: &mut R) { - req.set_api_version(kvrpcpb::ApiVersion::V2); - // TODO: req.encode_request(self); - } -} - -// TODO: pub struct ApiV2RawCodec - -// EncodeRequest is just a type wrapper to avoid passing not encoded request to `PlanBuilder` by mistake. -#[derive(Clone)] -pub struct EncodedRequest { - pub inner: Req, -} - -impl EncodedRequest { - pub fn new(mut req: Req, codec: &C) -> Self { - codec.encode_request(&mut req); - Self { inner: req } - } -} diff --git a/src/request/mod.rs b/src/request/mod.rs index 8c3a45cb..d2d58ddb 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -33,7 +33,6 @@ use crate::store::Request; use crate::store::{HasKeyErrors, Store}; use crate::transaction::HasLocks; -pub mod codec; pub mod plan; mod plan_builder; mod shard; @@ -43,9 +42,6 @@ mod shard; pub trait KvRequest: Request + Sized + Clone + Sync + Send + 'static { /// The expected response to the request. type Response: HasKeyErrors + HasLocks + Clone + Send + 'static; - - // TODO: fn encode_request() - // TODO: fn decode_response() } /// For requests or plans which are handled at TiKV store (other than region) level. @@ -98,12 +94,9 @@ mod test { use super::*; use crate::mock::MockKvClient; use crate::mock::MockPdClient; - use crate::pd::PdClient; use crate::proto::kvrpcpb; - use crate::proto::kvrpcpb::ApiVersion; use crate::proto::pdpb::Timestamp; use crate::proto::tikvpb::tikv_client::TikvClient; - use crate::request::codec::EncodedRequest; use crate::store::store_stream_for_keys; use crate::store::HasRegionError; use crate::transaction::lowering::new_commit_request; @@ -153,7 +146,9 @@ mod test { unreachable!(); } - fn set_api_version(&mut self, _api_version: ApiVersion) {} + fn set_api_version(&mut self, _: kvrpcpb::ApiVersion) { + unreachable!(); + } } #[async_trait] @@ -199,8 +194,7 @@ mod test { |_: &dyn Any| Ok(Box::new(MockRpcResponse) as Box), ))); - let encoded_req = EncodedRequest::new(request, pd_client.get_codec()); - let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) + let plan = crate::request::PlanBuilder::new(pd_client.clone(), request) .resolve_lock(Backoff::no_jitter_backoff(1, 1, 3)) .retry_multi_region(Backoff::no_jitter_backoff(1, 1, 3)) .extract_error() @@ -224,17 +218,16 @@ mod test { let key: Key = "key".to_owned().into(); let req = new_commit_request(iter::once(key), Timestamp::default(), Timestamp::default()); - let encoded_req = EncodedRequest::new(req, pd_client.get_codec()); // does not extract error - let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req.clone()) + let plan = crate::request::PlanBuilder::new(pd_client.clone(), req.clone()) .resolve_lock(OPTIMISTIC_BACKOFF) .retry_multi_region(OPTIMISTIC_BACKOFF) .plan(); assert!(plan.execute().await.is_ok()); // extract error - let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) + let plan = crate::request::PlanBuilder::new(pd_client.clone(), req) .resolve_lock(OPTIMISTIC_BACKOFF) .retry_multi_region(OPTIMISTIC_BACKOFF) .extract_error() diff --git a/src/request/plan_builder.rs b/src/request/plan_builder.rs index 8e2329e7..96f03dee 100644 --- a/src/request/plan_builder.rs +++ b/src/request/plan_builder.rs @@ -6,7 +6,6 @@ use std::sync::Arc; use super::plan::PreserveShard; use crate::backoff::Backoff; use crate::pd::PdClient; -use crate::request::codec::EncodedRequest; use crate::request::plan::{CleanupLocks, RetryableAllStores}; use crate::request::shard::HasNextBatch; use crate::request::Dispatch; @@ -47,11 +46,11 @@ pub struct Targetted; impl PlanBuilderPhase for Targetted {} impl PlanBuilder, NoTarget> { - pub fn new(pd_client: Arc, encoded_request: EncodedRequest) -> Self { + pub fn new(pd_client: Arc, request: Req) -> Self { PlanBuilder { pd_client, plan: Dispatch { - request: encoded_request.inner, + request, kv_client: None, }, phantom: PhantomData, diff --git a/src/request/shard.rs b/src/request/shard.rs index ec234239..428e5d0e 100644 --- a/src/request/shard.rs +++ b/src/request/shard.rs @@ -164,7 +164,7 @@ macro_rules! shardable_key { mut shard: Self::Shard, store: &$crate::store::RegionStore, ) -> $crate::Result<()> { - self.set_context(store.region_with_leader.context()?); + self.context = Some(store.region_with_leader.context()?); assert!(shard.len() == 1); self.key = shard.pop().unwrap(); Ok(()) @@ -197,7 +197,7 @@ macro_rules! shardable_keys { shard: Self::Shard, store: &$crate::store::RegionStore, ) -> $crate::Result<()> { - self.set_context(store.region_with_leader.context()?); + self.context = Some(store.region_with_leader.context()?); self.keys = shard.into_iter().map(Into::into).collect(); Ok(()) } @@ -257,7 +257,7 @@ macro_rules! shardable_range { shard: Self::Shard, store: &$crate::store::RegionStore, ) -> $crate::Result<()> { - self.set_context(store.region_with_leader.context()?); + self.context = Some(store.region_with_leader.context()?); // In a reverse range request, the range is in the meaning of [end_key, start_key), i.e. end_key <= x < start_key. // As a result, after obtaining start_key and end_key from PD, we need to swap their values when assigning them to the request. diff --git a/src/store/request.rs b/src/store/request.rs index e11fc8f1..5fa15c6d 100644 --- a/src/store/request.rs +++ b/src/store/request.rs @@ -21,8 +21,6 @@ pub trait Request: Any + Sync + Send + 'static { ) -> Result>; fn label(&self) -> &'static str; fn as_any(&self) -> &dyn Any; - /// Set the context for the request. - /// Should always use `set_context` other than modify the `self.context` directly. fn set_context(&mut self, context: kvrpcpb::Context); fn set_api_version(&mut self, api_version: kvrpcpb::ApiVersion); } @@ -55,13 +53,7 @@ macro_rules! impl_request { } fn set_context(&mut self, context: kvrpcpb::Context) { - let api_version = self - .context - .as_ref() - .map(|c| c.api_version) - .unwrap_or_default(); self.context = Some(context); - self.set_api_version(kvrpcpb::ApiVersion::try_from(api_version).unwrap()); } fn set_api_version(&mut self, api_version: kvrpcpb::ApiVersion) { diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 4bcb16d9..1ba0be81 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -10,7 +10,6 @@ use crate::config::Config; use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::pdpb::Timestamp; -use crate::request::codec::{ApiV1TxnCodec, ApiV2TxnCodec, Codec, EncodedRequest}; use crate::request::plan::CleanupLocksResult; use crate::request::Plan; use crate::timestamp::TimestampExt; @@ -44,11 +43,11 @@ const SCAN_LOCK_BATCH_SIZE: u32 = 1024; /// /// The returned results of transactional requests are [`Future`](std::future::Future)s that must be /// awaited to execute. -pub struct Client { - pd: Arc>, +pub struct Client { + pd: Arc, } -impl Clone for Client { +impl Clone for Client { fn clone(&self) -> Self { Self { pd: self.pd.clone(), @@ -56,7 +55,7 @@ impl Clone for Client { } } -impl Client { +impl Client { /// Create a transactional [`Client`] and connect to the TiKV cluster. /// /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for @@ -73,6 +72,7 @@ impl Client { /// # }); /// ``` pub async fn new>(pd_endpoints: Vec) -> Result { + // debug!("creating transactional client"); Self::new_with_config(pd_endpoints, Config::default()).await } @@ -101,35 +101,9 @@ impl Client { pd_endpoints: Vec, config: Config, ) -> Result { - Self::new_with_codec(pd_endpoints, config, ApiV1TxnCodec::default()).await - } -} - -impl Client { - pub async fn new_with_config_v2>( - _keyspace_name: &str, - pd_endpoints: Vec, - config: Config, - ) -> Result> { - debug!("creating new transactional client APIv2"); - let pd_endpoints: Vec = pd_endpoints.into_iter().map(Into::into).collect(); - let mut pd = PdRpcClient::connect(&pd_endpoints, config, true, None).await?; - let keyspace_id = 0; // TODO: get keyspace_id by pd.get_keyspace(keyspace_name) - pd.set_codec(ApiV2TxnCodec::new(keyspace_id)); - Ok(Client { pd: Arc::new(pd) }) - } -} - -impl Client { - pub async fn new_with_codec>( - pd_endpoints: Vec, - config: Config, - codec: Cod, - ) -> Result> { debug!("creating new transactional client"); let pd_endpoints: Vec = pd_endpoints.into_iter().map(Into::into).collect(); - let pd = - Arc::new(PdRpcClient::::connect(&pd_endpoints, config, true, Some(codec)).await?); + let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config, true).await?); Ok(Client { pd }) } @@ -153,7 +127,7 @@ impl Client { /// transaction.commit().await.unwrap(); /// # }); /// ``` - pub async fn begin_optimistic(&self) -> Result>> { + pub async fn begin_optimistic(&self) -> Result { debug!("creating new optimistic transaction"); let timestamp = self.current_timestamp().await?; Ok(self.new_transaction(timestamp, TransactionOptions::new_optimistic())) @@ -176,7 +150,7 @@ impl Client { /// transaction.commit().await.unwrap(); /// # }); /// ``` - pub async fn begin_pessimistic(&self) -> Result>> { + pub async fn begin_pessimistic(&self) -> Result { debug!("creating new pessimistic transaction"); let timestamp = self.current_timestamp().await?; Ok(self.new_transaction(timestamp, TransactionOptions::new_pessimistic())) @@ -199,21 +173,14 @@ impl Client { /// transaction.commit().await.unwrap(); /// # }); /// ``` - pub async fn begin_with_options( - &self, - options: TransactionOptions, - ) -> Result>> { + pub async fn begin_with_options(&self, options: TransactionOptions) -> Result { debug!("creating new customized transaction"); let timestamp = self.current_timestamp().await?; Ok(self.new_transaction(timestamp, options)) } /// Create a new [`Snapshot`](Snapshot) at the given [`Timestamp`](Timestamp). - pub fn snapshot( - &self, - timestamp: Timestamp, - options: TransactionOptions, - ) -> Snapshot> { + pub fn snapshot(&self, timestamp: Timestamp, options: TransactionOptions) -> Snapshot { debug!("creating new snapshot"); Snapshot::new(self.new_transaction(timestamp, options.read_only())) } @@ -280,8 +247,7 @@ impl Client { let ctx = ResolveLocksContext::default(); let backoff = Backoff::equal_jitter_backoff(100, 10000, 50); let req = new_scan_lock_request(range.into(), safepoint, options.batch_size); - let encoded_req = EncodedRequest::new(req, self.pd.get_codec()); - let plan = crate::request::PlanBuilder::new(self.pd.clone(), encoded_req) + let plan = crate::request::PlanBuilder::new(self.pd.clone(), req) .cleanup_locks(ctx.clone(), options, backoff) .retry_multi_region(DEFAULT_REGION_BACKOFF) .extract_error() @@ -300,8 +266,7 @@ impl Client { batch_size: u32, ) -> Result> { let req = new_scan_lock_request(range.into(), safepoint, batch_size); - let encoded_req = EncodedRequest::new(req, self.pd.get_codec()); - let plan = crate::request::PlanBuilder::new(self.pd.clone(), encoded_req) + let plan = crate::request::PlanBuilder::new(self.pd.clone(), req) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(crate::request::Collect) .plan(); @@ -317,19 +282,14 @@ impl Client { /// This interface is intended for special scenarios that resemble operations like "drop table" or "drop database" in TiDB. pub async fn unsafe_destroy_range(&self, range: impl Into) -> Result<()> { let req = new_unsafe_destroy_range_request(range.into()); - let encoded_req = EncodedRequest::new(req, self.pd.get_codec()); - let plan = crate::request::PlanBuilder::new(self.pd.clone(), encoded_req) + let plan = crate::request::PlanBuilder::new(self.pd.clone(), req) .all_stores(DEFAULT_STORE_BACKOFF) .merge(crate::request::Collect) .plan(); plan.execute().await } - fn new_transaction( - &self, - timestamp: Timestamp, - options: TransactionOptions, - ) -> Transaction> { + fn new_transaction(&self, timestamp: Timestamp, options: TransactionOptions) -> Transaction { Transaction::new(timestamp, self.pd.clone(), options) } } diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index afb1d6c4..8d70d07e 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -17,7 +17,6 @@ use crate::proto::kvrpcpb; use crate::proto::kvrpcpb::TxnInfo; use crate::proto::pdpb::Timestamp; use crate::region::RegionVerId; -use crate::request::codec::EncodedRequest; use crate::request::Collect; use crate::request::CollectSingle; use crate::request::Plan; @@ -76,8 +75,7 @@ pub async fn resolve_locks( Some(&commit_version) => commit_version, None => { let request = requests::new_cleanup_request(lock.primary_lock, lock.lock_version); - let encoded_req = EncodedRequest::new(request, pd_client.get_codec()); - let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) + let plan = crate::request::PlanBuilder::new(pd_client.clone(), request) .resolve_lock(OPTIMISTIC_BACKOFF) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(CollectSingle) @@ -118,8 +116,8 @@ async fn resolve_lock_with_retry( let store = pd_client.clone().store_for_key(key.into()).await?; let ver_id = store.region_with_leader.ver_id(); let request = requests::new_resolve_lock_request(start_version, commit_version); - let encoded_req = EncodedRequest::new(request, pd_client.get_codec()); - let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) + // The only place where single-region is used + let plan = crate::request::PlanBuilder::new(pd_client.clone(), request) .single_region_with_store(store) .await? .resolve_lock(Backoff::no_backoff()) @@ -359,8 +357,7 @@ impl LockResolver { force_sync_commit, resolving_pessimistic_lock, ); - let encoded_req = EncodedRequest::new(req, pd_client.get_codec()); - let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) + let plan = crate::request::PlanBuilder::new(pd_client.clone(), req) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(CollectSingle) .extract_error() @@ -384,8 +381,7 @@ impl LockResolver { txn_id: u64, ) -> Result { let req = new_check_secondary_locks_request(keys, txn_id); - let encoded_req = EncodedRequest::new(req, pd_client.get_codec()); - let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) + let plan = crate::request::PlanBuilder::new(pd_client.clone(), req) .retry_multi_region(DEFAULT_REGION_BACKOFF) .extract_error() .merge(Collect) @@ -401,8 +397,7 @@ impl LockResolver { ) -> Result { let ver_id = store.region_with_leader.ver_id(); let request = requests::new_batch_resolve_lock_request(txn_infos.clone()); - let encoded_req = EncodedRequest::new(request, pd_client.get_codec()); - let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) + let plan = crate::request::PlanBuilder::new(pd_client.clone(), request) .single_region_with_store(store.clone()) .await? .extract_error() diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 4f3e1b93..c08025e0 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -39,7 +39,6 @@ use crate::shardable_keys; use crate::shardable_range; use crate::store::store_stream_for_range; use crate::store::RegionStore; -use crate::store::Request; use crate::store::{store_stream_for_keys, Store}; use crate::timestamp::TimestampExt; use crate::transaction::HasLocks; @@ -298,7 +297,7 @@ impl Shardable for kvrpcpb::PrewriteRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_context(store.region_with_leader.context()?); + self.context = Some(store.region_with_leader.context()?); // Only need to set secondary keys if we're sending the primary key. if self.use_async_commit && !self.mutations.iter().any(|m| m.key == self.primary_lock) { @@ -365,7 +364,7 @@ impl Shardable for kvrpcpb::CommitRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_context(store.region_with_leader.context()?); + self.context = Some(store.region_with_leader.context()?); self.keys = shard.into_iter().map(Into::into).collect(); Ok(()) } @@ -456,7 +455,7 @@ impl Shardable for kvrpcpb::PessimisticLockRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_context(store.region_with_leader.context()?); + self.context = Some(store.region_with_leader.context()?); self.mutations = shard; Ok(()) } @@ -557,7 +556,7 @@ impl Shardable for kvrpcpb::ScanLockRequest { } fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_context(store.region_with_leader.context()?); + self.context = Some(store.region_with_leader.context()?); self.start_key = shard.0; Ok(()) } @@ -618,7 +617,7 @@ impl Shardable for kvrpcpb::TxnHeartBeatRequest { } fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_context(store.region_with_leader.context()?); + self.context = Some(store.region_with_leader.context()?); assert!(shard.len() == 1); self.primary_lock = shard.pop().unwrap(); Ok(()) @@ -676,7 +675,7 @@ impl Shardable for kvrpcpb::CheckTxnStatusRequest { } fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> { - self.set_context(store.region_with_leader.context()?); + self.context = Some(store.region_with_leader.context()?); assert!(shard.len() == 1); self.primary_key = shard.pop().unwrap(); Ok(()) diff --git a/src/transaction/snapshot.rs b/src/transaction/snapshot.rs index 0d1e4803..5694614b 100644 --- a/src/transaction/snapshot.rs +++ b/src/transaction/snapshot.rs @@ -2,11 +2,7 @@ use derive_new::new; use log::debug; -use std::marker::PhantomData; -use crate::codec::ApiV1TxnCodec; -use crate::pd::{PdClient, PdRpcClient}; -use crate::request::codec::Codec; use crate::BoundRange; use crate::Key; use crate::KvPair; @@ -22,12 +18,11 @@ use crate::Value; /// /// See the [Transaction](struct@crate::Transaction) docs for more information on the methods. #[derive(new)] -pub struct Snapshot = PdRpcClient> { - transaction: Transaction, - phantom: PhantomData, +pub struct Snapshot { + transaction: Transaction, } -impl> Snapshot { +impl Snapshot { /// Get the value associated with the given key. pub async fn get(&mut self, key: impl Into) -> Result> { debug!("invoking get request on snapshot"); diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index e984f153..e0f79170 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -1,7 +1,6 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use std::iter; -use std::marker::PhantomData; use std::sync::atomic; use std::sync::atomic::AtomicU8; use std::sync::Arc; @@ -16,12 +15,10 @@ use tokio::time::Duration; use crate::backoff::Backoff; use crate::backoff::DEFAULT_REGION_BACKOFF; -use crate::codec::ApiV1TxnCodec; use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::kvrpcpb; use crate::proto::pdpb::Timestamp; -use crate::request::codec::{Codec, EncodedRequest}; use crate::request::Collect; use crate::request::CollectError; use crate::request::CollectSingle; @@ -77,7 +74,7 @@ use crate::Value; /// txn.commit().await.unwrap(); /// # }); /// ``` -pub struct Transaction = PdRpcClient> { +pub struct Transaction { status: Arc, timestamp: Timestamp, buffer: Buffer, @@ -85,15 +82,14 @@ pub struct Transaction = options: TransactionOptions, is_heartbeat_started: bool, start_instant: Instant, - phantom: PhantomData, } -impl> Transaction { +impl Transaction { pub(crate) fn new( timestamp: Timestamp, rpc: Arc, options: TransactionOptions, - ) -> Transaction { + ) -> Transaction { let status = if options.read_only { TransactionStatus::ReadOnly } else { @@ -107,7 +103,6 @@ impl> Transaction { options, is_heartbeat_started: false, start_instant: std::time::Instant::now(), - phantom: PhantomData, } } @@ -140,8 +135,7 @@ impl> Transaction { self.buffer .get_or_else(key, |key| async move { let request = new_get_request(key, timestamp); - let encoded_req = EncodedRequest::new(request, rpc.get_codec()); - let plan = PlanBuilder::new(rpc, encoded_req) + let plan = PlanBuilder::new(rpc, request) .resolve_lock(retry_options.lock_backoff) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(CollectSingle) @@ -271,8 +265,7 @@ impl> Transaction { self.buffer .batch_get_or_else(keys.into_iter().map(|k| k.into()), move |keys| async move { let request = new_batch_get_request(keys, timestamp); - let encoded_req = EncodedRequest::new(request, rpc.get_codec()); - let plan = PlanBuilder::new(rpc, encoded_req) + let plan = PlanBuilder::new(rpc, request) .resolve_lock(retry_options.lock_backoff) .retry_multi_region(retry_options.region_backoff) .merge(Collect) @@ -745,8 +738,7 @@ impl> Transaction { primary_key, self.start_instant.elapsed().as_millis() as u64 + MAX_TTL, ); - let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); - let plan = PlanBuilder::new(self.rpc.clone(), encoded_req) + let plan = PlanBuilder::new(self.rpc.clone(), request) .resolve_lock(self.options.retry_options.lock_backoff.clone()) .retry_multi_region(self.options.retry_options.region_backoff.clone()) .merge(CollectSingle) @@ -776,8 +768,7 @@ impl> Transaction { move |new_range, new_limit| async move { let request = new_scan_request(new_range, timestamp, new_limit, key_only, reverse); - let encoded_req = EncodedRequest::new(request, rpc.get_codec()); - let plan = PlanBuilder::new(rpc, encoded_req) + let plan = PlanBuilder::new(rpc, request) .resolve_lock(retry_options.lock_backoff) .retry_multi_region(retry_options.region_backoff) .merge(Collect) @@ -832,8 +823,7 @@ impl> Transaction { for_update_ts.clone(), need_value, ); - let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); - let plan = PlanBuilder::new(self.rpc.clone(), encoded_req) + let plan = PlanBuilder::new(self.rpc.clone(), request) .resolve_lock(self.options.retry_options.lock_backoff.clone()) .preserve_shard() .retry_multi_region_preserve_results(self.options.retry_options.region_backoff.clone()) @@ -887,8 +877,7 @@ impl> Transaction { start_version, for_update_ts, ); - let encoded_req = EncodedRequest::new(req, self.rpc.get_codec()); - let plan = PlanBuilder::new(self.rpc.clone(), encoded_req) + let plan = PlanBuilder::new(self.rpc.clone(), req) .resolve_lock(self.options.retry_options.lock_backoff.clone()) .retry_multi_region(self.options.retry_options.region_backoff.clone()) .extract_error() @@ -957,8 +946,7 @@ impl> Transaction { primary_key.clone(), start_instant.elapsed().as_millis() as u64 + MAX_TTL, ); - let encoded_req = EncodedRequest::new(request, rpc.get_codec()); - let plan = PlanBuilder::new(rpc.clone(), encoded_req) + let plan = PlanBuilder::new(rpc.clone(), request) .retry_multi_region(region_backoff.clone()) .merge(CollectSingle) .plan(); @@ -1005,7 +993,7 @@ impl> Transaction { } } -impl> Drop for Transaction { +impl Drop for Transaction { fn drop(&mut self) { debug!("dropping transaction"); if std::thread::panicking() { @@ -1296,8 +1284,7 @@ impl Committer { .collect(); // FIXME set max_commit_ts and min_commit_ts - let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); - let plan = PlanBuilder::new(self.rpc.clone(), encoded_req) + let plan = PlanBuilder::new(self.rpc.clone(), request) .resolve_lock(self.options.retry_options.lock_backoff.clone()) .retry_multi_region(self.options.retry_options.region_backoff.clone()) .merge(CollectError) @@ -1337,8 +1324,7 @@ impl Committer { self.start_version.clone(), commit_version.clone(), ); - let encoded_req = EncodedRequest::new(req, self.rpc.get_codec()); - let plan = PlanBuilder::new(self.rpc.clone(), encoded_req) + let plan = PlanBuilder::new(self.rpc.clone(), req) .resolve_lock(self.options.retry_options.lock_backoff.clone()) .retry_multi_region(self.options.retry_options.region_backoff.clone()) .extract_error() @@ -1402,8 +1388,7 @@ impl Committer { .filter(|key| &primary_key != key); new_commit_request(keys, self.start_version, commit_version) }; - let encoded_req = EncodedRequest::new(req, self.rpc.get_codec()); - let plan = PlanBuilder::new(self.rpc, encoded_req) + let plan = PlanBuilder::new(self.rpc, req) .resolve_lock(self.options.retry_options.lock_backoff) .retry_multi_region(self.options.retry_options.region_backoff) .extract_error() @@ -1424,8 +1409,7 @@ impl Committer { match self.options.kind { TransactionKind::Optimistic => { let req = new_batch_rollback_request(keys, self.start_version); - let encoded_req = EncodedRequest::new(req, self.rpc.get_codec()); - let plan = PlanBuilder::new(self.rpc, encoded_req) + let plan = PlanBuilder::new(self.rpc, req) .resolve_lock(self.options.retry_options.lock_backoff) .retry_multi_region(self.options.retry_options.region_backoff) .extract_error() @@ -1434,8 +1418,7 @@ impl Committer { } TransactionKind::Pessimistic(for_update_ts) => { let req = new_pessimistic_rollback_request(keys, self.start_version, for_update_ts); - let encoded_req = EncodedRequest::new(req, self.rpc.get_codec()); - let plan = PlanBuilder::new(self.rpc, encoded_req) + let plan = PlanBuilder::new(self.rpc, req) .resolve_lock(self.options.retry_options.lock_backoff) .retry_multi_region(self.options.retry_options.region_backoff) .extract_error()