Skip to content

Commit

Permalink
Revert "*: Support API v2 (part 1) (#415)"
Browse files Browse the repository at this point in the history
This reverts commit 4b0e844.
  • Loading branch information
andylokandy committed Dec 14, 2023
1 parent bbaf317 commit 4db9895
Show file tree
Hide file tree
Showing 15 changed files with 95 additions and 306 deletions.
5 changes: 1 addition & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@

pub mod backoff;
#[doc(hidden)]
pub mod proto; // export `proto` to enable user customized codec
#[doc(hidden)]
pub mod raw;
pub mod request;
#[doc(hidden)]
Expand All @@ -106,6 +104,7 @@ mod compat;
mod config;
mod kv;
mod pd;
mod proto;
mod region;
mod region_cache;
mod stats;
Expand Down Expand Up @@ -146,8 +145,6 @@ pub use crate::raw::Client as RawClient;
#[doc(inline)]
pub use crate::raw::ColumnFamily;
#[doc(inline)]
pub use crate::request::codec;
#[doc(inline)]
pub use crate::request::RetryOptions;
#[doc(inline)]
pub use crate::timestamp::Timestamp;
Expand Down
21 changes: 2 additions & 19 deletions src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use crate::proto::metapb::RegionEpoch;
use crate::proto::metapb::{self};
use crate::region::RegionId;
use crate::region::RegionWithLeader;
use crate::request::codec::ApiV1TxnCodec;
use crate::store::KvConnect;
use crate::store::RegionStore;
use crate::store::Request;
Expand All @@ -31,7 +30,7 @@ use crate::Timestamp;

/// Create a `PdRpcClient` with it's internals replaced with mocks so that the
/// client can be tested without doing any RPC calls.
pub async fn pd_rpc_client() -> PdRpcClient<ApiV1TxnCodec, MockKvConnect, MockCluster> {
pub async fn pd_rpc_client() -> PdRpcClient<MockKvConnect, MockCluster> {
let config = Config::default();
PdRpcClient::new(
config.clone(),
Expand All @@ -44,7 +43,6 @@ pub async fn pd_rpc_client() -> PdRpcClient<ApiV1TxnCodec, MockKvConnect, MockCl
))
},
false,
Some(ApiV1TxnCodec::default()),
)
.await
.unwrap()
Expand Down Expand Up @@ -73,18 +71,9 @@ pub struct MockKvConnect;

pub struct MockCluster;

#[derive(new)]
pub struct MockPdClient {
client: MockKvClient,
codec: ApiV1TxnCodec,
}

impl MockPdClient {
pub fn new(client: MockKvClient) -> MockPdClient {
MockPdClient {
client,
codec: ApiV1TxnCodec::default(),
}
}
}

#[async_trait]
Expand Down Expand Up @@ -113,7 +102,6 @@ impl MockPdClient {
pub fn default() -> MockPdClient {
MockPdClient {
client: MockKvClient::default(),
codec: ApiV1TxnCodec::default(),
}
}

Expand Down Expand Up @@ -177,7 +165,6 @@ impl MockPdClient {

#[async_trait]
impl PdClient for MockPdClient {
type Codec = ApiV1TxnCodec;
type KvClient = MockKvClient;

async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore> {
Expand Down Expand Up @@ -227,8 +214,4 @@ impl PdClient for MockPdClient {
}

async fn invalidate_region_cache(&self, _ver_id: crate::region::RegionVerId) {}

fn get_codec(&self) -> &Self::Codec {
&self.codec
}
}
63 changes: 17 additions & 46 deletions src/pd/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use crate::region::RegionId;
use crate::region::RegionVerId;
use crate::region::RegionWithLeader;
use crate::region_cache::RegionCache;
use crate::request::codec::{ApiV1TxnCodec, Codec};
use crate::store::KvConnect;
use crate::store::RegionStore;
use crate::store::TikvConnect;
Expand Down Expand Up @@ -51,7 +50,6 @@ use crate::Timestamp;
/// So if we use transactional APIs, keys in PD are encoded and PD does not know about the encoding stuff.
#[async_trait]
pub trait PdClient: Send + Sync + 'static {
type Codec: Codec;
type KvClient: KvClient + Send + Sync + 'static;

/// In transactional API, `region` is decoded (keys in raw format).
Expand Down Expand Up @@ -193,11 +191,8 @@ pub trait PdClient: Send + Sync + 'static {
.boxed()
}

fn decode_region(
mut region: RegionWithLeader,
enable_mvcc_codec: bool,
) -> Result<RegionWithLeader> {
if enable_mvcc_codec {
fn decode_region(mut region: RegionWithLeader, enable_codec: bool) -> Result<RegionWithLeader> {
if enable_codec {
codec::decode_bytes_in_place(&mut region.region.start_key, false)?;
codec::decode_bytes_in_place(&mut region.region.end_key, false)?;
}
Expand All @@ -207,30 +202,20 @@ pub trait PdClient: Send + Sync + 'static {
async fn update_leader(&self, ver_id: RegionVerId, leader: metapb::Peer) -> Result<()>;

async fn invalidate_region_cache(&self, ver_id: RegionVerId);

/// Get the codec carried by `PdClient`.
/// The purpose of carrying the codec is to avoid passing it on so many calling paths.
fn get_codec(&self) -> &Self::Codec;
}

/// This client converts requests for the logical TiKV cluster into requests
/// for a single TiKV store using PD and internal logic.
pub struct PdRpcClient<
Cod: Codec = ApiV1TxnCodec,
KvC: KvConnect + Send + Sync + 'static = TikvConnect,
Cl = Cluster,
> {
pub struct PdRpcClient<KvC: KvConnect + Send + Sync + 'static = TikvConnect, Cl = Cluster> {
pd: Arc<RetryClient<Cl>>,
kv_connect: KvC,
kv_client_cache: Arc<RwLock<HashMap<String, KvC::KvClient>>>,
enable_mvcc_codec: bool,
enable_codec: bool,
region_cache: RegionCache<RetryClient<Cl>>,
codec: Option<Cod>,
}

#[async_trait]
impl<Cod: Codec, KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<Cod, KvC> {
type Codec = Cod;
impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
type KvClient = KvC::KvClient;

async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore> {
Expand All @@ -241,20 +226,20 @@ impl<Cod: Codec, KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClien
}

async fn region_for_key(&self, key: &Key) -> Result<RegionWithLeader> {
let enable_mvcc_codec = self.enable_mvcc_codec;
let key = if enable_mvcc_codec {
let enable_codec = self.enable_codec;
let key = if enable_codec {
key.to_encoded()
} else {
key.clone()
};

let region = self.region_cache.get_region_by_key(&key).await?;
Self::decode_region(region, enable_mvcc_codec)
Self::decode_region(region, enable_codec)
}

async fn region_for_id(&self, id: RegionId) -> Result<RegionWithLeader> {
let region = self.region_cache.get_region_by_id(id).await?;
Self::decode_region(region, self.enable_mvcc_codec)
Self::decode_region(region, self.enable_codec)
}

async fn all_stores(&self) -> Result<Vec<Store>> {
Expand Down Expand Up @@ -282,40 +267,31 @@ impl<Cod: Codec, KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClien
async fn invalidate_region_cache(&self, ver_id: RegionVerId) {
self.region_cache.invalidate_region_cache(ver_id).await
}

fn get_codec(&self) -> &Self::Codec {
self.codec
.as_ref()
.unwrap_or_else(|| panic!("codec not set"))
}
}

impl<Cod: Codec> PdRpcClient<Cod, TikvConnect, Cluster> {
impl PdRpcClient<TikvConnect, Cluster> {
pub async fn connect(
pd_endpoints: &[String],
config: Config,
enable_mvcc_codec: bool, // TODO: infer from `codec`.
codec: Option<Cod>,
) -> Result<PdRpcClient<Cod>> {
enable_codec: bool,
) -> Result<PdRpcClient> {
PdRpcClient::new(
config.clone(),
|security_mgr| TikvConnect::new(security_mgr, config.timeout),
|security_mgr| RetryClient::connect(pd_endpoints, security_mgr, config.timeout),
enable_mvcc_codec,
codec,
enable_codec,
)
.await
}
}

impl<Cod: Codec, KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<Cod, KvC, Cl> {
impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
pub async fn new<PdFut, MakeKvC, MakePd>(
config: Config,
kv_connect: MakeKvC,
pd: MakePd,
enable_mvcc_codec: bool,
codec: Option<Cod>,
) -> Result<PdRpcClient<Cod, KvC, Cl>>
enable_codec: bool,
) -> Result<PdRpcClient<KvC, Cl>>
where
PdFut: Future<Output = Result<RetryClient<Cl>>>,
MakeKvC: FnOnce(Arc<SecurityManager>) -> KvC,
Expand All @@ -337,9 +313,8 @@ impl<Cod: Codec, KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<Cod, Kv
pd: pd.clone(),
kv_client_cache,
kv_connect: kv_connect(security_mgr),
enable_mvcc_codec,
enable_codec,
region_cache: RegionCache::new(pd),
codec,
})
}

Expand All @@ -359,10 +334,6 @@ impl<Cod: Codec, KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<Cod, Kv
Err(e) => Err(e),
}
}

pub fn set_codec(&mut self, codec: Cod) {
self.codec = Some(codec);
}
}

fn make_key_range(start_key: Vec<u8>, end_key: Vec<u8>) -> kvrpcpb::KeyRange {
Expand Down
Loading

0 comments on commit 4db9895

Please sign in to comment.