Skip to content

Commit

Permalink
transaction: Support unsafe_destroy_range interface (#420)
Browse files Browse the repository at this point in the history
* add unsafe_destroy_range

Signed-off-by: Ping Yu <[email protected]>

* polish

Signed-off-by: Ping Yu <[email protected]>

* polish

Signed-off-by: Ping Yu <[email protected]>

* fix compile error on lower version of rust

Signed-off-by: Ping Yu <[email protected]>

---------

Signed-off-by: Ping Yu <[email protected]>
  • Loading branch information
pingyu committed Oct 7, 2023
1 parent 5ac72f2 commit d656079
Show file tree
Hide file tree
Showing 15 changed files with 338 additions and 12 deletions.
1 change: 1 addition & 0 deletions src/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use rand::thread_rng;
use rand::Rng;

pub const DEFAULT_REGION_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10);
pub const DEFAULT_STORE_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 1000, 10);
pub const OPTIMISTIC_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10);
pub const PESSIMISTIC_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10);

Expand Down
6 changes: 5 additions & 1 deletion src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ use crate::proto::metapb::{self};
use crate::region::RegionId;
use crate::region::RegionWithLeader;
use crate::request::codec::ApiV1TxnCodec;
use crate::store::KvClient;
use crate::store::KvConnect;
use crate::store::RegionStore;
use crate::store::Request;
use crate::store::{KvClient, Store};
use crate::Config;
use crate::Error;
use crate::Key;
Expand Down Expand Up @@ -206,6 +206,10 @@ impl PdClient for MockPdClient {
}
}

async fn all_stores(&self) -> Result<Vec<Store>> {
Ok(vec![Store::new(Arc::new(self.client.clone()))])
}

async fn get_timestamp(self: Arc<Self>) -> Result<Timestamp> {
Ok(Timestamp::default())
}
Expand Down
14 changes: 13 additions & 1 deletion src/pd/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use crate::region::RegionVerId;
use crate::region::RegionWithLeader;
use crate::region_cache::RegionCache;
use crate::request::codec::{ApiV1TxnCodec, Codec};
use crate::store::KvClient;
use crate::store::KvConnect;
use crate::store::RegionStore;
use crate::store::TikvConnect;
use crate::store::{KvClient, Store};
use crate::BoundRange;
use crate::Config;
use crate::Key;
Expand Down Expand Up @@ -78,6 +78,8 @@ pub trait PdClient: Send + Sync + 'static {
self.map_region_to_store(region).await
}

async fn all_stores(&self) -> Result<Vec<Store>>;

fn group_keys_by_region<K, K2>(
self: Arc<Self>,
keys: impl Iterator<Item = K> + Send + Sync + 'static,
Expand Down Expand Up @@ -255,6 +257,16 @@ impl<Cod: Codec, KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClien
Self::decode_region(region, self.enable_mvcc_codec)
}

async fn all_stores(&self) -> Result<Vec<Store>> {
let pb_stores = self.region_cache.read_through_all_stores().await?;
let mut stores = Vec::with_capacity(pb_stores.len());
for store in pb_stores {
let client = self.kv_client(&store.address).await?;
stores.push(Store::new(Arc::new(client)));
}
Ok(stores)
}

async fn get_timestamp(self: Arc<Self>) -> Result<Timestamp> {
self.pd.clone().get_timestamp().await
}
Expand Down
1 change: 0 additions & 1 deletion src/pd/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ impl RetryClientTrait for RetryClient<Cluster> {
})
}

#[allow(dead_code)]
async fn get_all_stores(self: Arc<Self>) -> Result<Vec<metapb::Store>> {
retry!(self, "get_all_stores", |cluster| async {
cluster
Expand Down
64 changes: 64 additions & 0 deletions src/region_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,39 @@ impl<C: RetryClientTrait> RegionCache<C> {
cache.key_to_ver_id.remove(&start_key);
}
}

pub async fn read_through_all_stores(&self) -> Result<Vec<Store>> {
let stores = self
.inner_client
.clone()
.get_all_stores()
.await?
.into_iter()
.filter(is_valid_tikv_store)
.collect::<Vec<_>>();
for store in &stores {
self.store_cache
.write()
.await
.insert(store.id, store.clone());
}
Ok(stores)
}
}

const ENGINE_LABEL_KEY: &str = "engine";
const ENGINE_LABEL_TIFLASH: &str = "tiflash";
const ENGINE_LABEL_TIFLASH_COMPUTE: &str = "tiflash_compute";

fn is_valid_tikv_store(store: &metapb::Store) -> bool {
if metapb::StoreState::from_i32(store.state).unwrap() == metapb::StoreState::Tombstone {
return false;
}
let is_tiflash = store.labels.iter().any(|label| {
label.key == ENGINE_LABEL_KEY
&& (label.value == ENGINE_LABEL_TIFLASH || label.value == ENGINE_LABEL_TIFLASH_COMPUTE)
});
!is_tiflash
}

#[cfg(test)]
Expand All @@ -253,6 +286,7 @@ mod test {
use crate::proto::metapb::{self};
use crate::region::RegionId;
use crate::region::RegionWithLeader;
use crate::region_cache::is_valid_tikv_store;
use crate::Key;
use crate::Result;

Expand Down Expand Up @@ -512,4 +546,34 @@ mod test {

region
}

#[test]
fn test_is_valid_tikv_store() {
let mut store = metapb::Store::default();
assert!(is_valid_tikv_store(&store));

store.state = metapb::StoreState::Tombstone.into();
assert!(!is_valid_tikv_store(&store));

store.state = metapb::StoreState::Up.into();
assert!(is_valid_tikv_store(&store));

store.labels.push(metapb::StoreLabel {
key: "some_key".to_owned(),
value: "some_value".to_owned(),
});
assert!(is_valid_tikv_store(&store));

store.labels.push(metapb::StoreLabel {
key: "engine".to_owned(),
value: "tiflash".to_owned(),
});
assert!(!is_valid_tikv_store(&store));

store.labels[1].value = "tiflash_compute".to_owned();
assert!(!is_valid_tikv_store(&store));

store.labels[1].value = "other".to_owned();
assert!(is_valid_tikv_store(&store));
}
}
8 changes: 7 additions & 1 deletion src/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use crate::backoff::Backoff;
use crate::backoff::DEFAULT_REGION_BACKOFF;
use crate::backoff::OPTIMISTIC_BACKOFF;
use crate::backoff::PESSIMISTIC_BACKOFF;
use crate::store::HasKeyErrors;
use crate::store::Request;
use crate::store::{HasKeyErrors, Store};
use crate::transaction::HasLocks;

pub mod codec;
Expand All @@ -47,6 +47,12 @@ pub trait KvRequest: Request + Sized + Clone + Sync + Send + 'static {
// TODO: fn decode_response()
}

/// For requests or plans which are handled at TiKV store (other than region) level.
pub trait StoreRequest {
/// Apply the request to specified TiKV store.
fn apply_store(&mut self, store: &Store);
}

#[derive(Clone, Debug, new, Eq, PartialEq)]
pub struct RetryOptions {
/// How to retry when there is a region error and we need to resolve regions with PD.
Expand Down
101 changes: 98 additions & 3 deletions src/request/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ use crate::proto::errorpb;
use crate::proto::errorpb::EpochNotMatch;
use crate::proto::kvrpcpb;
use crate::request::shard::HasNextBatch;
use crate::request::KvRequest;
use crate::request::NextBatch;
use crate::request::Shardable;
use crate::request::{KvRequest, StoreRequest};
use crate::stats::tikv_stats;
use crate::store::HasKeyErrors;
use crate::store::HasRegionError;
use crate::store::HasRegionErrors;
use crate::store::KvClient;
use crate::store::RegionStore;
use crate::store::{HasKeyErrors, Store};
use crate::transaction::resolve_locks;
use crate::transaction::HasLocks;
use crate::transaction::ResolveLocksContext;
Expand Down Expand Up @@ -73,7 +73,19 @@ impl<Req: KvRequest> Plan for Dispatch<Req> {
}
}

impl<Req: KvRequest + StoreRequest> StoreRequest for Dispatch<Req> {
fn apply_store(&mut self, store: &Store) {
self.kv_client = Some(store.client.clone());
self.request.apply_store(store);
}
}

const MULTI_REGION_CONCURRENCY: usize = 16;
const MULTI_STORES_CONCURRENCY: usize = 16;

fn is_grpc_error(e: &Error) -> bool {
matches!(e, Error::GrpcAPI(_) | Error::Grpc(_))
}

pub struct RetryableMultiRegion<P: Plan, PdC: PdClient> {
pub(super) inner: P,
Expand Down Expand Up @@ -150,7 +162,6 @@ where
let res = plan.execute().await;
drop(permit);

let is_grpc_error = |e: &Error| matches!(e, Error::GrpcAPI(_) | Error::Grpc(_));
let mut resp = match res {
Ok(resp) => resp,
Err(e) if is_grpc_error(&e) => {
Expand Down Expand Up @@ -354,6 +365,90 @@ where
}
}

pub struct RetryableAllStores<P: Plan, PdC: PdClient> {
pub(super) inner: P,
pub pd_client: Arc<PdC>,
pub backoff: Backoff,
}

impl<P: Plan, PdC: PdClient> Clone for RetryableAllStores<P, PdC> {
fn clone(&self) -> Self {
RetryableAllStores {
inner: self.inner.clone(),
pd_client: self.pd_client.clone(),
backoff: self.backoff.clone(),
}
}
}

// About `HasRegionError`:
// Store requests should be return region errors.
// But as the response of only store request by now (UnsafeDestroyRangeResponse) has the `region_error` field,
// we require `HasRegionError` to check whether there is region error returned from TiKV.
#[async_trait]
impl<P: Plan + StoreRequest, PdC: PdClient> Plan for RetryableAllStores<P, PdC>
where
P::Result: HasKeyErrors + HasRegionError,
{
type Result = Vec<Result<P::Result>>;

async fn execute(&self) -> Result<Self::Result> {
let concurrency_permits = Arc::new(Semaphore::new(MULTI_STORES_CONCURRENCY));
let stores = self.pd_client.clone().all_stores().await?;
let mut handles = Vec::with_capacity(stores.len());
for store in stores {
let mut clone = self.inner.clone();
clone.apply_store(&store);
let handle = tokio::spawn(Self::single_store_handler(
clone,
self.backoff.clone(),
concurrency_permits.clone(),
));
handles.push(handle);
}
let results = try_join_all(handles).await?;
Ok(results.into_iter().collect::<Vec<_>>())
}
}

impl<P: Plan, PdC: PdClient> RetryableAllStores<P, PdC>
where
P::Result: HasKeyErrors + HasRegionError,
{
async fn single_store_handler(
plan: P,
mut backoff: Backoff,
permits: Arc<Semaphore>,
) -> Result<P::Result> {
loop {
let permit = permits.acquire().await.unwrap();
let res = plan.execute().await;
drop(permit);

match res {
Ok(mut resp) => {
if let Some(e) = resp.key_errors() {
return Err(Error::MultipleKeyErrors(e));
} else if let Some(e) = resp.region_error() {
// Store request should not return region error.
return Err(Error::RegionError(Box::new(e)));
} else {
return Ok(resp);
}
}
Err(e) if is_grpc_error(&e) => match backoff.next_delay_duration() {
Some(duration) => {
sleep(duration).await;
continue;
}
None => return Err(e),
},
Err(e) => return Err(e),
}
}
}
}

/// A technique for merging responses into a single result (with type `Out`).
pub trait Merge<In>: Sized + Clone + Send + Sync + 'static {
type Out: Send;
Expand Down
24 changes: 22 additions & 2 deletions src/request/plan_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ use super::plan::PreserveShard;
use crate::backoff::Backoff;
use crate::pd::PdClient;
use crate::request::codec::EncodedRequest;
use crate::request::plan::CleanupLocks;
use crate::request::plan::{CleanupLocks, RetryableAllStores};
use crate::request::shard::HasNextBatch;
use crate::request::DefaultProcessor;
use crate::request::Dispatch;
use crate::request::ExtractError;
use crate::request::KvRequest;
Expand All @@ -22,6 +21,7 @@ use crate::request::ProcessResponse;
use crate::request::ResolveLock;
use crate::request::RetryableMultiRegion;
use crate::request::Shardable;
use crate::request::{DefaultProcessor, StoreRequest};
use crate::store::HasKeyErrors;
use crate::store::HasRegionError;
use crate::store::HasRegionErrors;
Expand Down Expand Up @@ -194,6 +194,26 @@ impl<PdC: PdClient, R: KvRequest> PlanBuilder<PdC, Dispatch<R>, NoTarget> {
}
}

impl<PdC: PdClient, P: Plan + StoreRequest> PlanBuilder<PdC, P, NoTarget>
where
P::Result: HasKeyErrors + HasRegionError,
{
pub fn all_stores(
self,
backoff: Backoff,
) -> PlanBuilder<PdC, RetryableAllStores<P, PdC>, Targetted> {
PlanBuilder {
pd_client: self.pd_client.clone(),
plan: RetryableAllStores {
inner: self.plan,
pd_client: self.pd_client,
backoff,
},
phantom: PhantomData,
}
}
}

impl<PdC: PdClient, P: Plan + Shardable> PlanBuilder<PdC, P, NoTarget>
where
P::Result: HasKeyErrors,
Expand Down
2 changes: 2 additions & 0 deletions src/store/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ has_region_error!(kvrpcpb::CheckTxnStatusResponse);
has_region_error!(kvrpcpb::CheckSecondaryLocksResponse);
has_region_error!(kvrpcpb::DeleteRangeResponse);
has_region_error!(kvrpcpb::GcResponse);
has_region_error!(kvrpcpb::UnsafeDestroyRangeResponse);
has_region_error!(kvrpcpb::RawGetResponse);
has_region_error!(kvrpcpb::RawBatchGetResponse);
has_region_error!(kvrpcpb::RawPutResponse);
Expand Down Expand Up @@ -111,6 +112,7 @@ has_str_error!(kvrpcpb::RawCasResponse);
has_str_error!(kvrpcpb::RawCoprocessorResponse);
has_str_error!(kvrpcpb::ImportResponse);
has_str_error!(kvrpcpb::DeleteRangeResponse);
has_str_error!(kvrpcpb::UnsafeDestroyRangeResponse);

impl HasKeyErrors for kvrpcpb::ScanResponse {
fn key_errors(&mut self) -> Option<Vec<Error>> {
Expand Down
5 changes: 5 additions & 0 deletions src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ pub struct RegionStore {
pub client: Arc<dyn KvClient + Send + Sync>,
}

#[derive(new, Clone)]
pub struct Store {
pub client: Arc<dyn KvClient + Send + Sync>,
}

#[async_trait]
pub trait KvConnectStore: KvConnect {
async fn connect_to_store(
Expand Down
Loading

0 comments on commit d656079

Please sign in to comment.